2012-07-04 7 views
7

Ho bisogno di generare N thread di consumo, che elaborano simultaneamente lo stesso InputStream, ad esempio: trasformarlo in qualche modo, calcolare il checksum o la firma digitale ecc. Questi consumatori non dipendono l'uno dall'altro e tutti stanno usando librerie di terze parti, che accettano InputStream come fonte di dati.Elaborazione contemporanea di InputStream singolo con utenti indipendenti

Quindi quello che posso fare è - a creare un po 'di attuazione InputStream, che sarà

  • lettura blocco di dati provenienti da "genitore" flusso
  • consumatori sbloccare
  • attendere che tutti i consumatori di leggere il pezzo intero
  • leggere prossimo pezzo

pur essendo alla ricerca semplice, si può salire vari problemi come livel ck quando alcuni consumatori muoiono, implementano tutti i metodi InputStream, controllano fork/join dei consumatori stessi usando barriere/latch ecc.

Un amico mi ha detto che è mezz'ora da implementare, ha reso la mia serata.

Preferirei utilizzare qualcosa di abbastanza maturo (googling non è stato fornito con risultati, il mio google-fu non è abbastanza buono?) O non preoccuparti e copiare l'intero flusso "sorgente" in un file temporaneo e usarlo come fonte di dati. Quest'ultima soluzione sembra essere più affidabile, ma potrebbe finire con la creazione di file di gigabyte (durante l'elaborazione dell'audio in streaming, ad esempio).

+0

È possibile scrivere i dati su un file e generare N FileInputStreams? –

+0

@JonLin Come ha detto verso la fine della domanda, può farlo. –

risposta

3

Per come la vedo io, dovresti avere almeno una sorta di buffering in modo che consumatori diversi possano spostarsi attraverso lo streaming a ritmi diversi senza che tutto venga costantemente impantanato dal consumatore al momento più lento. Ciò garantisce sostanzialmente le prestazioni nel caso peggiore e pochissimi benefici della concorrenza.

È possibile, ad esempio, etichettare ogni blocco con i consumatori che lo hanno utilizzato finora e quindi eliminare quelli completamente esauriti. Forse questo potrebbe essere ottenuto da ciascun consumatore con un riferimento a ogni pezzo che non ha ancora utilizzato, il che consentirebbe a GC di occuparsi automaticamente dei pezzi usati. Il produttore potrebbe mantenere un elenco di WeakReference s in blocchi in modo che abbia un handle sul numero di blocchi ancora da utilizzare e basare la sua limitazione su quello.

Sto anche pensando di avere un'istanza InputStream per thread separata, che comunica internamente con il produttore InputStream. In questo modo hai una soluzione facile per il tuo rischio di vita: try ... finally { is.close(); } - il consumatore morente chiude il proprio inputstream. Questo è comunicato al produttore.

Ho alcune idee con l'utilizzo di un ArrayBlockingQueue per utente. Ci sarebbe qualche difficoltà nel garantire che tutti i consumatori siano nutriti correttamente, senza che il produttore sia bloccato o occupato-aspetta.

+0

Non direi che è davvero poco vantaggioso: avendo 5 consumatori che lavorano per 1 secondo e un consumatore che lavora per 2 secondi, l'invocazione simultanea darà 2 secondi mentre il sequenziale darà 7 secondi. O mi sto perdendo qualcosa qui? Avendo taggati blocchi e buffer, colpirò il consumo di memoria, cosa che vorrei evitare. – jdevelop

+0

Sì, quello che dici è inevitabile. Tuttavia, se in media i consumatori sono in equilibrio, ma le loro prestazioni variano ampiamente, perderai l'opportunità di concorrenza se aspetti sempre che ogni consumatore sia attualmente in ritardo. Buffering avrebbe aiutato lì. E se si introduce il bilanciamento prioritario del thread, si potrebbe effettivamente ottenere una situazione del genere. –

0

Hai considerato l'utilizzo di stream di tubi? Il tuo produttore può avere uno o più PipedOuputStream in cui getta qualunque cosa legga dal file. Dall'altro lato dei tubi, avete diversi thread di consumo che leggono su un corrispondente PipedInputstream (che è un InputStream che è possibile condividere con le vostre librerie).

Il thread del produttore può decidere tramite quale dei dati dei tubi devono essere inviati, in questo modo, fornendo i dati da elaborare per una determinata lettura del thread di consumo sull'altro lato del tubo.

Se è necessario recuperare i dati dai thread dei consumatori, è possibile creare un'altra pipe, nella direzione opposta, per inviare i dati all'utente.

+1

Un 'PipedOutputStream' bloccherà il produttore non appena un consumatore resterà indietro, affamando tutti gli altri consumatori. –

0

È possibile provare alcune implementazioni JMS (Java Messaging Service) come Apache ActiveMQ.

Nel tuo caso dovresti creare un cosiddetto argomento (vedi Topics vs. Queues). Un argomento è creato dal produttore e pubblicato su N consumatori, che possono essere eseguiti contemporaneamente, con ogni consumatore che riceve esattamente gli stessi dati.

Poiché si desidera utilizzare InputStream s, è disponibile un capitolo su come send messages are streams.

Suppongo, in genere, produttori e consumatori sarebbero processi separati, probabilmente in esecuzione su macchine diverse sulla rete. Penso che tu possa configurarlo per funzionare completamente in una JVM, però. Ciò dipenderebbe dall'implementazione di JMS. Questi sono anche piuttosto famosi: HornetQ by JBoss, RabbitMQ e un sacco di altri.

Problemi correlati