2013-02-11 21 views
11

Ho un'applicazione Java che ha thread di lavoro per elaborare i lavori. Un lavoratore produce un oggetto risultato, dire qualcosa del tipo:Passaggio in thread-safe Java di oggetti di raccolta da un thread a un altro

class WorkerResult{ 
    private final Set<ResultItems> items; 
    public Worker(Set<ResultItems> pItems){ 
     items = pItems; 
    } 
} 

Quando il lavoratore finisce, lo fa questa operazione:

... 
final Set<ResultItems> items = new SomeNonThreadSafeSetImplSet<ResultItems>(); 
for(Item producedItem : ...){ 
     items.add(item); 
} 
passToGatherThread(items); 

Il set items è una specie di "lavoro di unità di" qui . Il metodo passToGatherThread passa lo items impostato su un thread di raccolta, di cui solo uno esiste in fase di esecuzione.

Qui la sincronizzazione non è necessaria qui, poiché le condizioni di gara non possono verificarsi perché solo un thread (Gather-thread) legge il set items. AFAICS, il gather-thread potrebbe non vedere tutti gli elementi perché il set non è thread-safe, giusto?

Suppongo di non riuscire a sincronizzare passToGatherThread, perché è una libreria di terze parti. Quello che temo fondamentalmente è che il thread di raccolta non veda tutti gli elementi a causa del caching, le ottimizzazioni della VM, ecc. Ecco quindi la domanda: come passare gli elementi impostati in modo thread-safe, in modo che il thread di Gather "veda" l'insieme corretto di elementi?

+0

Non sono sicuro, ma forse [PipedStreams] (http://docs.oracle.com/javase/6/docs/api/java/io/PipedInputStream.html) può aiutarti? – oliholz

+1

Qual è la definizione di 'passToGatherthread'? Penso che questo sia cruciale per capire se le risposte fornite di seguito sono corrette. In che modo esattamente gli oggetti vengono passati al thread di raccolta? –

+0

Hai davvero problemi con la visibilità degli oggetti o sospetti solo un simile comportamento? Le tue preoccupazioni sembrano piuttosto inverosimili in quelle circostanze. – Dariusz

risposta

1

ho pensato (e discusso) questa domanda un sacco e sono venuto su con un'altra risposta, che, spero, sarà la soluzione migliore.

Il passaggio a una raccolta sincronizzata non va bene in termini di efficienza, poiché ogni operazione successiva su quella raccolta verrà sincronizzata - se ci sono molte operazioni, potrebbe rivelarsi un handicap.

Al punto: facciamo qualche ipotesi (che io non condivido):

  • il passToGatherThread metodo di cui è in effetti non sicuri, per quanto improbabile sembra
  • compilatore può riordinare gli eventi nel codice in modo che il passToGatherThread viene chiamato prima della raccolta è riempito

il, pulito e forse il modo più semplice e più efficiente per garantire che la raccolta passato al metodo raccoglitore è pronta e completa è t o mettere la spinta della raccolta in un blocco sincronizzato, in questo modo:

synchronized(items) { 
    passToGatherThread(items); 
} 

In questo modo si garantisce una sincronizzazione memoria e una sequenza accadere, prima valido prima passa la raccolta, facendo in modo che tutti gli oggetti vengono passati correttamente quindi.

+1

Penso che tu abbia ragione, e alla fine questo è esattamente il modo in cui ho riscontrato questo problema. Sembra un po 'brutto, vero? Sincronizzazione su un set appena creato, che per la maggior parte delle persone sembra semplicemente stupido ... ;-) Grazie per la risposta. – xSNRG

1

Non sembra esserci alcun problema di sincronizzazione qui. Crei un nuovo oggetto Set per ogni passToGatherThread e lo fai dopo aver modificato il set. Nessun oggetto andrà perso.

Insieme (e la maggior parte delle raccolte Java) è possibile accedere contemporaneamente a più thread purché non venga apportata alcuna modifica alla raccolta. Questo è ciò che è Collections.unmodifiableCollection.

Poiché il citato metodo passToGatherThread funge da comunicazione con altri thread, deve utilizzare una sorta di sincronizzazione e ogni sincronizzazione garantisce la coerenza della memoria tra i thread.

Inoltre, si noti che tutte le scritture sugli oggetti nella raccolta passata vengono effettuate prima dello e passate all'altro thread. Anche se la memoria viene copiata nella cache locale del thread, ha lo stesso valore non modificato dell'altro thread.

+1

AFAIK la JVM è autorizzata a memorizzare nella cache i dati laddove possibile nei registri del core del processore. Quando i dati scritti senza "svuotamento" (usando sincronizzato/volatile/ecc.) Altri thread possono vedere valori "stantio" o addirittura non del tutto poiché la semantica "avvenuta prima" è valida solo nel thread che produce i dati. – xSNRG

+0

@xSNRG ha aggiunto un paragrafo. Se il metodo pass è valido, non ci dovrebbe essere nulla di cui preoccuparsi. – Dariusz

+0

Inoltre, se questo non ha funzionato, un sacco di applicazioni non funzionerebbero. – Dariusz

1

È possibile utilizzare semplicemente una delle implementazioni thread-safe di Set che Java fornisce per il proprio WorkerResult. Si veda ad esempio:

Un'altra opzione è quella di utilizzare Collections.synchronizedSet().

+0

Perché è necessario? Cosa c'è di sbagliato nell'usare un'implementazione di 'Set' non thread-safe? –

+0

Penso che l'OP abbia paura del caching. I thread possono memorizzare nella cache dati non "volatili" e se le scritture su di essa non sono sincronizzate non si vede mai un aggiornamento. – joergl

+1

Sì, joergl ha assolutamente ragione. Vedi il mio commento sotto la risposta di Dariusz Wawer. – xSNRG

0

Il lavoratore implementa callable e restituisce WorkerResult:

class Worker implements Callable<WorkerResult> { 
    private WorkerInput in; 

    public Worker(WorkerInput in) { 
     this.in = in; 
    } 

    public WorkerResult call() { 
     // do work here 
    } 
} 

Poi abbiamo utilizzare un ExecutorService per gestire il pool di thread, e raccogliere i risultati via utilizzando futuro.

public class PooledWorkerController { 

    private static final int MAX_THREAD_POOL = 3; 
    private final ExecutorService pool = 
     Executors.newFixedThreadPool(MAX_THREAD_POOL); 

    public Set<ResultItems> process(List<WorkerInput> inputs) 
      throws InterruptedException, ExecutionException{   
     List<Future<WorkerResult>> submitted = new ArrayList<>(); 
     for (WorkerInput in : inputs) { 
      Future<WorkerResult> future = pool.submit(new Worker(in)); 
      submitted.add(future); 
     } 
     Set<ResultItems> results = new HashSet<>(); 
     for (Future<WorkerResult> future : submitted) { 
      results.addAll(future.get().getItems()); 
     } 
     return results; 
    } 
} 
+0

Così si crea una discussione ogni volta che viene avviato un lavoro. -1 per quello. – Dariusz

+0

nel tuo post hai introdotto una brutta pratica di creare thread ogni volta che viene chiamato un pacchetto di lavori. La creazione di thread è molto costosa e ogni volta che è possibile creare thread una volta e riutilizzati. Il tuo codice sarebbe IMO molto, molto meglio se avessi un singolo pool di thread inizializzato staticamente che verrebbe utilizzato in 'process'. La creazione di un nuovo pool di thread per ogni chiamata 'process' non cambia molto. – Dariusz

Problemi correlati