2012-04-04 9 views
5

Ho una situazione in cui thread diversi popolano una coda (produttori) e un utente recupera elemento da questa coda. Il mio problema è che quando uno di questi elementi viene recuperato dalla coda, alcuni vengono persi (segnale mancante?). Il codice di produttori è:java simultane: multi-producer one-consumer

class Producer implements Runnable { 

    private Consumer consumer; 

    Producer(Consumer consumer) { this.consumer = consumer; } 

    @Override 
public void run() { 
    consumer.send("message"); 
    } 
} 

e si sono creati e correre con:

ExecutorService executor = Executors.newSingleThreadExecutor(); 
for (int i = 0; i < 20; i++) { 
    executor.execute(new Producer(consumer)); 
} 

Codice del consumo è:

class Consumer implements Runnable { 

private Queue<String> queue = new ConcurrentLinkedQueue<String>(); 

void send(String message) { 
    synchronized (queue) { 
     queue.add(message); 
     System.out.println("SIZE: " + queue.size()); 
     queue.notify(); 
    } 
} 

@Override 
public void run() { 
    int counter = 0; 
    synchronized (queue) { 
    while(true) { 
     try { 
      System.out.println("SLEEP"); 
       queue.wait(10); 
     } catch (InterruptedException e) { 
       Thread.interrupted(); 
     } 
     System.out.println(counter); 
     if (!queue.isEmpty()) {    
      queue.poll(); 
      counter++; 
     } 
    } 
    } 
} 

} 

Quando viene eseguito il codice ottengo a volte 20 elementi aggiunti e 20 recuperati, ma in altri casi gli elementi recuperati sono inferiori a 20. Qualche idea su come risolverli?

+0

Si sta utilizzando una strana combinazione di costrutti di sincronizzazione di basso livello ('wait',' notify') e quelli di alto livello ('ConcurrentLinkedQueue',' ExecutorService'). Usa l'uno o l'altro! – artbristol

+0

L'ho fatto ma in entrambi i casi ho lo stesso problema – Randomize

+0

Non riesco a vedere il codice che esegue effettivamente Consumer. – dhblah

risposta

10

Suggerirei di utilizzare un BlockingQueue anziché una coda. Un LinkedBlockingDeque potrebbe essere un buon candidato per te.

Il codice sarà simile a questa:

void send(String message) { 
    synchronized (queue) { 
     queue.put(message); 
     System.out.println("SIZE: " + queue.size()); 
    } 
} 

e poi avresti bisogno di appena

queue.take() 

sul filo dei consumatori

L'idea è che .Prendere() sarà bloccare fino a quando un articolo è disponibile nella coda e quindi restituire esattamente uno (che è dove penso che la tua implementazione soffre: mancata notifica whil e polling). .put() è responsabile di fare tutte le notifiche per te. Nessuna attesa/notifica necessaria.

+0

Provato LinkedBlockingDeque ma ho ancora lo stesso problema – Randomize

+0

@Randomize puoi pubblicare un esempio del codice problematico utilizzando un BlockingQueue? Il codice del consumatore dovrebbe essere sufficiente. – charisis

+0

Sto riutilizzando esattamente lo stesso codice sopra ho appena sostituito ConcurrentLinkedQueue con LinkedBlockingDeque. – Randomize

2

Il problema nel codice è probabilmente dovuto al fatto che si utilizza notify anziché notifyAll. Il primo risveglierà solo un thread, se ce n'è uno in attesa sul blocco. Ciò consente una condizione di gara in cui nessun thread è in attesa e il segnale è perso. Una notificaAll forzerà la correttezza a un costo minore delle prestazioni richiedendo a tutti i thread di svegliarsi per verificare se possono ottenere il blocco.

Questo è meglio spiegato in Effective Java 1st ed (vedi p.150). La seconda edizione ha rimosso questo suggerimento dal momento che i programmatori dovrebbero utilizzare java.util.concurrent che fornisce maggiori garanzie di correttezza.

+0

Ho usato notifyAll ma non ha funzionato – Randomize

+0

C'è un solo utente così notify/notifyAll non fa differenza –

1

Sembra una cattiva idea utilizzare ConcurrentLinkedQueue e la sincronizzazione entrambi allo stesso tempo. In primo luogo, sfida lo scopo delle strutture di dati concorrenti.

Non c'è alcun problema con la struttura dati ConcurrentLinkedQueue e la sua sostituzione con BlockingQueue risolverà il problema ma questa non è la causa principale.

Il problema è con queue.wait (10). Questo è il metodo di attesa a tempo. Acquisisce il blocco una volta trascorso 10 ms.

  1. notifica (queue.notify()) andranno persi perché non c'è nessun filo dei consumatori in attesa su di esso se 10ms è trascorso.

  2. Il produttore non sarà in grado di aggiungere alla coda poiché non è possibile acquisire il blocco perché il blocco viene rivendicato nuovamente dall'utente.

Trasferirsi in BlockingQueue risolto il problema, perché è stato rimosso la vostra attesa (10) codice e aspettare e notifica è stata curata dalla struttura di dati BlockingQueue.