2009-09-15 10 views
80

mia domanda riguarda this question chiesto in precedenza. Nelle situazioni in cui sto usando una coda per la comunicazione tra i thread dei produttori e dei consumatori le persone generalmente consigliano di utilizzare LinkedBlockingQueue o ConcurrentLinkedQueue?LinkedBlockingQueue vs ConcurrentLinkedQueue

Quali sono i vantaggi/gli svantaggi dell'utilizzo uno sull'altro?

La differenza principale che posso vedere da una prospettiva API è che un LinkedBlockingQueue può essere facoltativamente delimitata.

risposta

83

Per un filo produttore/consumatore, non sono sicuro che ConcurrentLinkedQueue è ancora una scelta ragionevole - non implementa BlockingQueue, che è l'interfaccia fondamentale per le code produttore/consumatore IMO. Dovresti chiamare poll(), aspettare un po 'se non hai trovato nulla, e poi eseguire il polling di nuovo ecc ... portando a ritardi quando arriva un nuovo elemento, e inefficienze quando è vuoto (a causa di un risveglio inutile dal sonno).

Dalla documentazione per BlockingQueue:

BlockingQueue implementazioni sono progettati per essere utilizzati in primo luogo per le code produttore-consumatore

Lo so che non è così strettamente dicono che solo il blocco code dovrebbe essere utilizzato per le code produttore-consumatore, ma anche così ...

+3

Grazie Jon - non avevo notato. Quindi, dove/perché dovresti usare ConcurrentLinkedQueue? – Adamski

+18

Quando è necessario accedere alla coda da molti thread, ma non è necessario "attendere" su di esso. –

+1

Un 'ConcurrentLinkedQueue' è anche utile se il thread sta controllando più code. Ad esempio, in un server multi-tenant. Supponendo che per ragioni di isolamento non si usi una singola coda di blocco e un discriminatore di inquilino. – LateralFractal

0

Un'altra soluzione (che non scala bene) è canali Rendezvous: java.u til.concurrent SynchronousQueue

1

Se la coda non è espandibile e contiene solo un produttore/thread consumatore. È possibile utilizzare la coda senza blocco (non è necessario bloccare l'accesso ai dati).

12

LinkedBlockingQueue blocchi del consumatore o del produttore quando la coda è vuota o piena e il rispettivo filo consumatore/produttore viene addormentato. Ma questa funzione di blocco ha un costo: ogni operazione put o take è bloccata tra produttori o consumatori (se molti), quindi in scenari con molti produttori/consumatori l'operazione potrebbe essere più lenta.

ConcurrentLinkedQueue non utilizza i blocchi, ma CAS, sulle operazioni di inserimento/ricezione che potenzialmente riducono la contesa con molti thread di produzione e di consumo. Ma essendo un "aspettare liberi" struttura dati, ConcurrentLinkedQueue volontà non bloccare quando è vuoto, il che significa che il consumatore avrà bisogno per affrontare il ritorno take()null valori da "attesa occupato", per esempio, con il filo del consumatore di mangiare la CPU.

Quindi qual è "migliore" dipende dal numero di thread di consumo, sul tasso che consumano/prodotti, ecc È necessario un punto di riferimento per ogni scenario.

Un particolare caso d'uso in cui la ConcurrentLinkedQueue è chiaramente migliore è quando i produttori prima di produrre qualcosa e finire il loro lavoro mettendo il lavoro nella coda e solo dopo consumatori inizia a consumare, sapendo che sarà fatto quando la coda è vuoto. (qui non c'è concorrenza tra produttore-consumatore ma solo tra produttore-produttore e consumatore-consumatore)

+0

un dubbio qui. Come hai detto, il consumatore aspetta quando la coda è vuota..come aspetta? Chi lo notificherà per non aspettare? – Brinal

+0

@brindal L'unico modo per aspettare, che io sappia, è in un ciclo. Questo è un problema significativo a cui non è stata data molta attenzione nelle risposte qui. L'esecuzione di un ciclo in attesa di dati richiede molto tempo del processore. Lo saprai quando i tuoi fan inizieranno a girare. L'unico rimedio è mettere il sonno nel circuito. Quindi è un problema in un sistema con flusso di dati incoerente. Forse non capisco la risposta di AlexandruNedelcu, ma un sistema operativo in sé è un sistema concorrente, che sarebbe estremamente inefficiente se fosse pieno di loop di eventi non bloccanti. – orodbhen

38

Questa domanda merita una risposta migliore.

Java ConcurrentLinkedQueue si basa sulle famose code algorithm by Maged M. Michael and Michael L. Scott per non-blocking lock-free.

"Non bloccante" come termine qui per una risorsa contesa (la nostra coda) significa che indipendentemente da ciò che fa lo scheduler della piattaforma, come interrompere un thread, o se il thread in questione è semplicemente troppo lento, altri thread contendono per la stessa risorsa sarà ancora in grado di progredire. Ad esempio, se un blocco è coinvolto, il thread che blocca il blocco potrebbe essere interrotto e tutti i thread in attesa di quel blocco sarebbero bloccati. I blocchi intrinseci (la parola chiave synchronized) in Java possono anche avere una penalità severa per le prestazioni, ad esempio quando è implicato biased locking e si dispone di contesa, oppure dopo che la VM decide di "gonfiare" il blocco dopo un periodo di tolleranza di rotazione e bloccare i thread concorrenti ... che è il motivo per cui in molti contesti (scenari di contesa basso/medio), fare confronti e insiemi su riferimenti atomici può essere molto più efficiente e questo è esattamente ciò che stanno facendo molte strutture di dati non bloccanti.

Java ConcurrentLinkedQueue non è solo non bloccante, ma ha la proprietà eccezionale che il produttore non contende con il consumatore. In un singolo produttore/scenario singolo consumatore (SPSC), questo significa davvero che non ci sarà alcuna contesa di cui parlare. In uno scenario a più produttori/consumatori singoli, il consumatore non contenderà i produttori. Questa coda ha contesa quando più produttori provano a offer(), ma questa è la concorrenza per definizione. È fondamentalmente uno scopo generale ed efficiente coda non bloccante.

Per quanto non sia un BlockingQueue, beh, bloccare un thread per attendere su una coda è un modo terribilmente terribile di progettare sistemi concorrenti. Non farlo. Se non riesci a capire come utilizzare uno ConcurrentLinkedQueue in uno scenario consumatore/produttore, passa semplicemente alle astrazioni di livello superiore, come un buon framework per attori.

+4

Per il tuo ultimo paragrafo, perché dici che aspettare su una coda è un modo terribile di progettare sistemi concorrenti? Se abbiamo un threadgroup con 10 thread che mangiano attività da una taskqueue, cosa c'è di sbagliato nel bloccare quando il taskqueue ha meno di 10 task? – Pacerier

+9

@AlexandruNedelcu Non è possibile fare una dichiarazione radicale come "terribilmente terribile" dove molto spesso i framework degli attori che si dice di usare usano i threadpool che sono voi stessi * BlockingQueue *. Se hai bisogno di un sistema altamente reattivo e sai come gestire la contropressione (qualcosa che mitiga le code di blocco) rispetto al non blocco è chiaramente superiore. Ma spesso il blocco di IO e le code di blocco possono essere eseguite in modo non blocco, in particolare se si hanno attività a lungo termine che sono vincolate all'IO e non possono essere divise e conquistate. –

+0

@AdamGent - I framework degli attori hanno un'implementazione di cassette postali basate su code di blocco, ma a mio parere è un bug, perché il blocco non funziona oltre i confini asincroni e quindi funziona solo nelle demo. Per me questa è stata una fonte di frustrazione, poiché ad esempio la nozione di Akka di gestire l'overflow è bloccare, invece di rilasciare messaggi, fino alla versione 2.4, che non è ancora uscita. Detto questo, non credo che esistano casi d'uso per i quali le code di blocco possano essere superiori. Stai anche confondendo due cose che non dovrebbero essere confuse. Non ho parlato di bloccare I/O. –

1

FYI, provate questo --- Tipo di hacky, non eccessivamente scientifico, ma forse interessante:

import java.util.ArrayList; 
import java.util.List; 
import java.util.Queue; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.CompletableFuture; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.function.Supplier; 

class QueueDemo { 
    private static final int NUM_NODES = 1000000; 
    private static final int NUM_TRIALS = 10; 
    private static final int NUM_THREADS = 8; 
    private static final Object ANY_OBJECT = new Object(); 

    public static void main(String[] args) { 
     Queue<Object> qN = new ConcurrentLinkedQueue<>(); 
     Queue<Object> qB = new LinkedBlockingQueue<>(); 
     Queue<Object> qA = new ArrayBlockingQueue<>(NUM_NODES+1); 

     for (int i=0 ; i<NUM_TRIALS ; i++) { 
      doOneTrial(qN, "non-blocking"); 
      doOneTrial(qB, " blocking"); 
      doOneTrial(qA, "  Array"); 
     } 
    } 

    private static void doOneTrial(final Queue<Object> q, String name) { 
     List<CompletableFuture<Integer>> futures = new ArrayList<>(NUM_THREADS); 
     CountDownLatch startSignal = new CountDownLatch(1); 
     CountDownLatch doneSignal = new CountDownLatch(NUM_THREADS); 

     fillQueue(q); 
     for (int i=0 ; i<NUM_THREADS ; i++) { 
      futures.add(CompletableFuture.supplyAsync(new Supplier<Integer>() { 
       public Integer get() { 
        int count = 0; 
        wait4(startSignal); 
        while (q.poll() != null) count++; 
        doneSignal.countDown(); 
        return count; 
       } 
      })); 
     } 

     long startTime = System.currentTimeMillis(); 
     startSignal.countDown(); 
     wait4(doneSignal); 
     long endTime = System.currentTimeMillis(); 

     int count = 0; 
     for (CompletableFuture<Integer> future : futures) { 
      count += future.join(); 
     } 
     if (count == NUM_NODES) { 
      System.out.println(name + ", " + Long.toString(endTime-startTime)); 
     } else { 
      System.out.println("Aieeeeeegh!"); 
      System.exit(1); 
     } 
    } 

    private static void fillQueue(Queue<Object> q) { 
     for (int i=0 ; i<NUM_NODES ; i++) { 
      q.add(ANY_OBJECT); 
     } 
    } 

    private static void wait4(CountDownLatch latch) { 
     try { 
      latch.await(); 
     } catch (InterruptedException ex) { 
      throw new RuntimeException(ex); 
     } 
    } 
} 
Problemi correlati