2012-05-16 13 views
6

Sono ancora nel processo di avvolgere il mio cervello su come la concorrenza funziona in Java. Capisco che (se si sottoscrive il modello di concorrenza OO Java 5) si implementa uno Task o Callable con un metodo run() o call() (rispettivamente) e si consiglia di parallelizzare il più possibile il metodo implementato.Multithreading di un file di massa leggere

Ma non ho ancora capire qualcosa di inerente su programmazione concorrente in Java:

  • Come è un Task 's metodo run() assegnato la giusta quantità di lavoro simultanee da eseguire?

Come esempio concreto, che cosa se ho un readMobyDick() metodo di I/O-bound che legge l'intero contenuto di Herman Melville Moby Dick in memoria da un file sul sistema locale. E diciamo solo che voglio questo metodo readMobyDick() di essere concorrente e gestito da 3 Riquadri, dove:

  • Discussione # 1 legge il primo 1/3rd del libro in memoria
  • Discussione # 2 legge il secondo 1/3 ° del libro in memoria
  • Discussione # 3 recita l'ultimo 1/3rd del libro in memoria

Devo pezzo Moby Dick in tre file e ogni passata al proprio compito o semplicemente chiamare lo readMobyDick() dall'interno dell'imp lemented run() e (in qualche modo) il Executor sa come interrompere il lavoro tra i thread.

Sono uno studente molto visivo, quindi tutti gli esempi di codice del modo giusto per avvicinarsi a questo sono molto apprezzati! Grazie!

+0

Grande nome del metodo! Quando ho provato a leggere Moby Dick, ho scoperto che dovevo farlo contemporaneamente (interlacciato con altri libri) ;-) –

+0

Quindi ... la grande balena bianca sarebbe .NET, sì? –

risposta

14

Probabilmente avete scelto per caso l'esempio peggiore assoluto di attività parallele!

La lettura in parallelo da un singolo disco meccanico è in realtà più lenta rispetto alla lettura con un singolo thread, perché si sta infatti rimbalzando la testina meccanica su diverse sezioni del disco mentre ogni thread ha il proprio turno di esecuzione. È meglio lasciarlo come un'attività a thread singolo.

Facciamo un altro esempio, che è simile al tuo ma può in realtà offrire alcuni vantaggi: supponiamo che voglio cercare le occorrenze di una certa parola in un elenco enorme di parole (questo elenco potrebbe anche provenire da un file su disco , ma come ho detto, letto da un singolo thread). Supponiamo che io possa utilizzare 3 thread come nell'esempio, ognuno dei quali cerca su 1/3 dell'enorme elenco di parole e mantiene un contatore locale di quante volte è apparsa la parola cercata.

In questo caso, è necessario suddividere l'elenco in 3 parti, passare ciascuna parte a un oggetto diverso il cui tipo implementa Runnable e la ricerca implementata nel metodo run.

Lo stesso runtime non ha idea di come eseguire il partizionamento o qualcosa del genere, è necessario specificarlo manualmente. Ci sono molte altre strategie di partizionamento, ciascuna con i suoi punti di forza e di debolezza, ma per ora possiamo attenerci al partizionamento statico.

Vediamo po 'di codice:

class SearchTask implements Runnable { 
    private int localCounter = 0; 
    private int start; // start index of search 
    private int end; 
    private List<String> words; 
    private String token; 

    public SearchTask(int start, int end, List<String> words, String token) { 
     this.start = start; 
     this.end = end; 
     this.words = words; 
     this.token = token; 
    } 

    public void run() { 
     for(int i = start; i < end; i++) { 
       if(words.get(i).equals(token)) localCounter++; 
     } 
    } 

    public int getCounter() { return localCounter; } 
} 

// meanwhile in main :) 

List<String> words = new ArrayList<String>(); 
// populate words 
// let's assume you have 30000 words 

// create tasks 
SearchTask task1 = new SearchTask(0, 10000, words, "John"); 
SearchTask task2 = new SearchTask(10000, 20000, words, "John"); 
SearchTask task3 = new SearchTask(20000, 30000, words, "John"); 

// create threads 
Thread t1 = new Thread(task1); 
Thread t2 = new Thread(task1); 
Thread t3 = new Thread(task1); 

// start threads 
t1.start(); 
t2.start(); 
t3.start(); 

// wait for threads to finish 
t1.join(); 
t2.join(); 
t3.join(); 

// collect results 
int counter = 0; 
counter += task1.getCounter(); 
counter += task2.getCounter(); 
counter += task3.getCounter(); 

Questo dovrebbe funzionare bene. Si noti che nei casi pratici si dovrebbe creare uno schema di partizionamento più generico. In alternativa, è possibile utilizzare ExecutorService e implementare Callable anziché Runnable se si desidera restituire un risultato.

Così un esempio alternativo utilizzando costrutti più avanzati:

class SearchTask implements Callable<Integer> { 
    private int localCounter = 0; 
    private int start; // start index of search 
    private int end; 
    private List<String> words; 
    private String token; 

    public SearchTask(int start, int end, List<String> words, String token) { 
     this.start = start; 
     this.end = end; 
     this.words = words; 
     this.token = token; 
    } 

    public Integer call() { 
     for(int i = start; i < end; i++) { 
       if(words.get(i).equals(token)) localCounter++; 
     } 
     return localCounter; 
    }   
} 

// meanwhile in main :) 

List<String> words = new ArrayList<String>(); 
// populate words 
// let's assume you have 30000 words 

// create tasks 
List<Callable> tasks = new ArrayList<Callable>(); 
tasks.add(new SearchTask(0, 10000, words, "John")); 
tasks.add(new SearchTask(10000, 20000, words, "John")); 
tasks.add(new SearchTask(20000, 30000, words, "John")); 

// create thread pool and start tasks 
ExecutorService exec = Executors.newFixedThreadPool(3); 
List<Future> results = exec.invokeAll(tasks); 

// wait for tasks to finish and collect results 
int counter = 0; 
for(Future f: results) { 
    counter += f.get(); 
} 
+0

Quindi quale sarebbe un buon esempio di un'attività che trarrebbe beneficio dal multithreading? Non mi interessa affatto di leggere i file dal disco - mi interessa vedere un esempio vivente, che respira (** codice **) di come il lavoro viene suddiviso in pezzi e alimentato per le attività. – IAmYourFaja

+0

@herpylderp: ho pubblicato una modifica. Il codice è in arrivo. :) – Tudor

+0

Un buon esempio potrebbe essere una coda servita da più thread –

1

Hai scelto un cattivo esempio, come Tudor stato così gentile da sottolineare. L'hardware del disco rotante è soggetto a vincoli fisici per lo spostamento di piatti e testine e l'implementazione di lettura più efficiente consiste nel leggere ogni blocco in ordine, riducendo la necessità di spostare la testina o attendere l'allineamento del disco.

Detto questo, alcuni sistemi operativi non memorizzano sempre le cose continuamente sui dischi, e per coloro che ricordano, la deframmentazione potrebbe fornire un incremento delle prestazioni del disco se il sistema operativo/file system non ha fatto il lavoro per voi.

Come hai accennato a volere un programma che ne trarrebbe beneficio, lascia che ne suggerisca uno semplice, con aggiunta di matrice.

Supponendo di aver creato un thread per core, è possibile dividere banalmente due matrici da aggiungere in righe N (una per ogni thread). Matrix Inoltre (se vi ricordate) funziona come tale:

A + B = C 

o

[ a11, a12, a13 ] [ b11, b12, b13] = [ (a11+b11), (a12+b12), (a13+c13) ] 
[ a21, a22, a23 ] + [ b21, b22, b23] = [ (a21+b21), (a22+b22), (a23+c23) ] 
[ a31, a32, a33 ] [ b31, b32, b33] = [ (a31+b31), (a32+b32), (a33+c33) ] 

Quindi, per distribuire questo attraverso N fili, abbiamo semplicemente bisogno di prendere il conteggio delle righe e dividere il modulo per il numero di thread per ottenere l'ID del thread con cui verrà aggiunto.

matrix with 20 rows across 3 threads 
row % 3 == 0 (for rows 0, 3, 6, 9, 12, 15, and 18) 
row % 3 == 1 (for rows 1, 4, 7, 10, 13, 16, and 19) 
row % 3 == 2 (for rows 2, 5, 8, 11, 14, and 17) 
// row 20 doesn't exist, because we number rows from 0 

Ora ogni filo "sa" cui un file deve gestire, ei risultati "per riga" può essere calcolato banalmente perché i risultati non si intersecano nel dominio di altro filo della computazione.

Tutto ciò che è necessario ora è una struttura di dati "risultato" che tiene traccia quando i valori sono stati calcolati e quando viene impostato l'ultimo valore, quindi il calcolo è completo. In questo esempio "finto" di un risultato di aggiunta di matrice con due thread, il calcolo della risposta con due thread richiede circa la metà del tempo.

// the following assumes that threads don't get rescheduled to different cores for 
// illustrative purposes only. Real Threads are scheduled across cores due to 
// availability and attempts to prevent unnecessary core migration of a running thread. 
[ done, done, done ] // filled in at about the same time as row 2 (runs on core 3) 
[ done, done, done ] // filled in at about the same time as row 1 (runs on core 1) 
[ done, done, .... ] // filled in at about the same time as row 4 (runs on core 3) 
[ done, ...., .... ] // filled in at about the same time as row 3 (runs on core 1) 

Problemi più complessi possono essere risolti dal multithreading e diversi problemi vengono risolti con tecniche diverse. Ho scelto di proposito uno degli esempi più semplici.

1

di implementare un'attività o richiamabile con un metodo run() o chiamare il numero() (rispettivamente), e conviene di parallelizzare come gran parte di quel metodo implementato il più possibile.

Un Task rappresenta un'unità distinta di lavoro
Caricamento di un file nella memoria è un'unità discreta di lavoro e può pertanto questa attività può essere delegata a un thread in background. Cioè un thread in background esegue questa attività di caricamento del file.
È un'unità di lavoro discreta poiché non ha altre dipendenze necessarie per svolgere il proprio lavoro (caricare il file) e ha confini discreti.
Quello che stai chiedendo è di dividerlo ulteriormente in attività. Cioè un thread carica 1/3 del file mentre un altro thread il 2/3 ecc.
Se fosse possibile suddividere l'attività in ulteriori sottoattività, non sarebbe un compito in primo luogo per definizione. Quindi caricare un file è una singola attività da solo.

Per fare un esempio:
Diciamo che avete una GUI e dovete presentare ai dati utente da 5 file diversi. Per presentarli è necessario anche preparare alcune strutture dati per elaborare i dati effettivi.
Tutte queste sono attività separate.
E.g. il caricamento dei file è di 5 compiti diversi, quindi potrebbe essere fatto da 5 diversi thread.
La preparazione delle strutture di dati potrebbe essere eseguita in un thread diverso.
La GUI viene eseguita in un altro thread.
Tutto ciò può accadere in concomitanza

-1

Se si sistema supportato di alto throughput di I/O, ecco come si può fare:

How to read a file using multiple threads in Java when a high throughput(3GB/s) file system is available

Ecco la soluzione per leggere un singolo file con più thread.

Dividere il file in blocchi N, leggere ogni blocco in un thread, quindi unirli in ordine. Fai attenzione alle linee che attraversano i confini del chunk. E 'l'idea di base come suggerito dall'utente slaks

bench-marking sotto realizzazione di multipli-thread per un singolo file da 20 GB:

1 Discussione: 50 secondi: 400 MB/s

2 Fili: 30 secondi: 666 MB/s

4 Discussioni: 20 secondi: 1 GB/s

8 Discussioni: 60 Secon ds: 333 MB/s

readAllLines Java7 equivalenti(): 400 secondi: 50 MB/s

Nota: questo può funzionare solo su sistemi che sono progettati per supportare high-throughput di I/O, e non su soliti personal computer

Ecco le lendini essenziali del codice, per maggiori dettagli, segui il link

public class FileRead implements Runnable 
{ 

private FileChannel _channel; 
private long _startLocation; 
private int _size; 
int _sequence_number; 

public FileRead(long loc, int size, FileChannel chnl, int sequence) 
{ 
    _startLocation = loc; 
    _size = size; 
    _channel = chnl; 
    _sequence_number = sequence; 
} 

@Override 
public void run() 
{ 
     System.out.println("Reading the channel: " + _startLocation + ":" + _size); 

     //allocate memory 
     ByteBuffer buff = ByteBuffer.allocate(_size); 

     //Read file chunk to RAM 
     _channel.read(buff, _startLocation); 

     //chunk to String 
     String string_chunk = new String(buff.array(), Charset.forName("UTF-8")); 

     System.out.println("Done Reading the channel: " + _startLocation + ":" + _size); 

} 

//args[0] is path to read file 
//args[1] is the size of thread pool; Need to try different values to fing sweet spot 
public static void main(String[] args) throws Exception 
{ 
    FileInputStream fileInputStream = new FileInputStream(args[0]); 
    FileChannel channel = fileInputStream.getChannel(); 
    long remaining_size = channel.size(); //get the total number of bytes in the file 
    long chunk_size = remaining_size/Integer.parseInt(args[1]); //file_size/threads 


    //thread pool 
    ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt(args[1])); 

    long start_loc = 0;//file pointer 
    int i = 0; //loop counter 
    while (remaining_size >= chunk_size) 
    { 
     //launches a new thread 
     executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i)); 
     remaining_size = remaining_size - chunk_size; 
     start_loc = start_loc + chunk_size; 
     i++; 
    } 

    //load the last remaining piece 
    executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i)); 

    //Tear Down 

} 

} 
Problemi correlati