2012-02-27 9 views
14

Sono interessato a una struttura dati identica a Java BlockingQueue, con l'eccezione che deve essere in grado di raggruppare oggetti nella coda. In altre parole, vorrei che il produttore fosse in grado di mettere oggetti in coda, ma avere il blocco utente su take() fino a quando la coda raggiunge una certa dimensione (la dimensione del batch).Java BlockingQueue con il batching?

Quindi, una volta che la coda ha raggiunto la dimensione del batch, il produttore deve bloccare put() fino a quando il consumatore ha consumato tutti gli elementi nella coda (nel qual caso il produttore inizierà a produrre nuovamente e il blocco del consumatore fino al batch è raggiunto di nuovo).

Esiste una struttura dati simile? O dovrei scriverlo (cosa che non mi dispiace), non voglio sprecare il mio tempo se c'è qualcosa là fuori.


UPDATE

Forse a chiarire le cose un po ':

La situazione sarà sempre come segue. Possono esserci più produttori che aggiungono oggetti alla coda, ma non ci sarà mai più di un consumatore che preleva oggetti dalla coda.

Ora, il problema è che ci sono più di questi setup in parallelo e seriale. In altre parole, i produttori producono articoli per più code, mentre i consumatori a loro volta possono anche essere produttori. Questo può essere più facilmente pensato come un grafico diretto di produttori, produttori di consumatori e, infine, consumatori.

Il motivo per cui i produttori devono bloccare fino a quando le code non sono vuote (@Peter Lawrey) è perché ognuno di questi verrà eseguito in una discussione. Se li lasci semplicemente a produrre quando lo spazio diventa disponibile, finirai con una situazione in cui hai troppi thread che tentano di elaborare troppe cose contemporaneamente.

Forse l'abbinamento con un servizio di esecuzione potrebbe risolvere il problema?

risposta

9

Ti suggerisco di utilizzare BlockingQueue.drainTo(Collection, int). Puoi usarlo con take() per assicurarti di ottenere un numero minimo di elementi.

Il vantaggio di utilizzare questo approccio è che le dimensioni del batch crescono in modo dinamico con il carico di lavoro e il produttore non deve bloccare quando il consumatore è occupato. cioè si auto-ottimizza per latenza e velocità effettiva.


Per implementare esattamente come richiesto (che credo sia una cattiva idea) è possibile utilizzare una SynchronousQueue con un filo consumo occupato.

cioè il filo che consumano fa un

list.clear(); 
while(list.size() < required) list.add(queue.take()); 
// process list. 

Il produttore si bloccano quando mai il consumatore è occupato.

+0

Voglio bloccare il produttore mentre il consumatore è occupato. –

+1

Interessante, la maggior parte dei sistemi fa di tutto per evitarlo. ;) Il secondo suggerimento farà esattamente questo. Se vuoi bloccare il produttore, perché stai usando più thread? non sarebbe più semplice per il "produttore" essere il processore/consumatore, così come non sembra che vogliano che funzionino allo stesso tempo. –

+0

Si prega di consultare il mio aggiornamento. Il design richiede anche ai produttori di bloccare in modo che il numero di thread in esecuzione sia mantenuto basso. Inoltre, risolve un problema di dipendenza tra produttori e consumatori. –

1

Non che io sappia. Se ho capito bene, vuoi che il produttore funzioni (mentre il consumatore è bloccato) fino a quando non riempie la coda o il consumatore al lavoro (mentre il produttore blocca) finché non cancella la coda. In tal caso, suggerisco di non aver bisogno di una struttura dati ma di un meccanismo per bloccare l'interlocutore mentre l'altro sta lavorando in un mutex fasion. È possibile bloccare un oggetto per quello e internamente avere la logica del pieno o del vuoto per rilasciare il blocco e passarlo all'altro interlocutore. Quindi, in breve, dovresti scrivere tu stesso :)

1

Questo suona come il RingBuffer funziona nel modello di LMAX Disruptor. Vedi http://code.google.com/p/disruptor/ per ulteriori informazioni.

Una spiegazione molto approssimativa è che la struttura dati principale è il RingBuffer. I produttori inseriscono i dati nel buffer circolare in sequenza e i consumatori possono estrarre tutti i dati che il produttore ha inserito nel buffer (quindi essenzialmente batch). Se il buffer è pieno, il produttore blocca fino a quando l'utente non ha finito e ha liberato gli slot nel buffer.

2

Ecco un'implementazione rapida (= semplice ma non completamente testata) che ritengo possa essere adatta alle vostre richieste: dovreste essere in grado di estenderla per supportare l'intera interfaccia della coda, se necessario.

per aumentare le prestazioni è possibile passare a ReentrantLock invece di utilizzare parole chiave "sincronizzato" ..

public class BatchBlockingQueue<T> { 

    private ArrayList<T> queue; 
    private Semaphore readerLock; 
    private Semaphore writerLock; 
    private int batchSize; 

    public BatchBlockingQueue(int batchSize) { 
     this.queue = new ArrayList<>(batchSize); 
     this.readerLock = new Semaphore(0); 
     this.writerLock = new Semaphore(batchSize); 
     this.batchSize = batchSize; 
    } 

    public synchronized void put(T e) throws InterruptedException { 
     writerLock.acquire(); 
     queue.add(e); 
     if (queue.size() == batchSize) { 
      readerLock.release(batchSize); 
     } 
    } 

    public synchronized T poll() throws InterruptedException { 
     readerLock.acquire(); 
     T ret = queue.remove(0); 
     if (queue.isEmpty()) { 
      writerLock.release(batchSize); 
     } 
     return ret; 
    } 

} 

Spero vi sia utile.