2011-10-13 8 views
10

La domanda è semplice e sono sorpreso che non sia comparso immediatamente quando l'ho cercato.Come elaborare le righe di un file CSV utilizzando Groovy/GPars nel modo più efficiente?

Ho un file CSV, potenzialmente molto grande, che deve essere elaborato. Ogni riga deve essere consegnata a un processore fino a quando tutte le righe vengono elaborate. Per leggere il file CSV, userò OpenCSV che essenzialmente fornisce un metodo readNext() che mi dà la riga successiva. Se non sono disponibili più righe, tutti i processori dovrebbero terminare.

Per questo ho creato uno script groovy davvero semplice, definito un metodo readNext() sincrono (poiché la lettura della riga successiva non richiede molto tempo) e quindi creato alcuni thread che leggono la riga successiva e lo elaborano. Funziona bene, ma ...

Non dovrebbe esserci una soluzione integrata che potrei usare? Non è l'elaborazione della collezione gpars, perché presuppone sempre che ci sia una collezione esistente in memoria. Invece, non posso permettermi di leggere tutto in memoria e poi elaborarlo, porterebbe a eccezioni eccezionali.

Quindi ... chiunque abbia un modello piacevole per l'elaborazione di un file CSV "riga per riga" utilizzando un paio di thread di lavoro?

risposta

6

L'accesso simultaneo a un file potrebbe non essere una buona idea e l'elaborazione fork/join di GPars è destinata esclusivamente ai dati in memoria (raccolte). Il mio suggerimento sarebbe quello di leggere il file in modo sequenziale in una lista. Quando l'elenco raggiunge una certa dimensione, elaborare le voci nell'elenco contemporaneamente utilizzando GPars, cancellare l'elenco e quindi andare avanti con le linee di lettura.

2

Sto semplicemente completando un'implementazione di un problema proprio come questo in Grails (non si specifica se si utilizzano graal, plain hibernate, plain JDBC o altro).

Non c'è niente fuori dalla scatola che si possa ottenere di cui sono a conoscenza. Potresti guardare all'integrazione con Spring Batch, ma l'ultima volta che l'ho guardato mi sembrava molto pesante (e non molto groovy).

Se si utilizza un semplice JDBC, fare ciò che Christoph consiglia è probabilmente la cosa più semplice da fare (leggere le righe N e utilizzare GPars per scorrere simultaneamente su tali righe).

Se si utilizzano graal o si ibernano e si desidera che i thread di lavoro abbiano accesso al contesto di primavera per l'iniezione delle dipendenze, le cose si complicano leggermente.

Il modo in cui ho risolto sta usando il plug-in Grails Redis (disclaimer: io sono l'autore) e la Jesque plugin, che è un'implementazione Java di Resque.

Il plug-in Jesque consente di creare classi "Lavoro" che hanno un metodo "processo" con parametri arbitrari che vengono utilizzati per elaborare il lavoro accodato in una coda di gesuiti. Puoi far ruotare quanti più lavoratori vuoi.

Ho un caricamento di file a cui un utente amministratore può inviare un file, salva il file su disco e accoda un lavoro per ProducerJob che ho creato. That ProducerJob gira il file, per ogni riga, accoda un messaggio per un ConsumerJob da raccogliere. Il messaggio è semplicemente una mappa dei valori letti dal file CSV.

ConsumerJob acquisisce tali valori e crea l'oggetto dominio appropriato per la propria riga e lo salva nel database.

Abbiamo già utilizzato Redis in produzione, quindi l'utilizzo di questo come meccanismo di accodamento aveva senso.Avevamo un vecchio carico sincrono che passava attraverso i carichi di file in serie. Attualmente sto utilizzando un worker worker e 4 consumer worker e il caricamento delle cose in questo modo è superiore a 100 volte più veloce del vecchio carico (con un feedback di avanzamento molto migliore per l'utente finale).

Sono d'accordo con la domanda iniziale che probabilmente c'è spazio per qualcosa di simile da confezionare perché questa è una cosa relativamente comune.

AGGIORNAMENTO: Ho inserito a blog post with a simple example doing imports with Redis + Jesque.

5

Questo potrebbe essere un buon problema per gli attori. Un attore del lettore sincrono può trasferire le linee CSV agli attori del processore parallelo. Ad esempio:

@Grab(group='org.codehaus.gpars', module='gpars', version='0.12') 

import groovyx.gpars.actor.DefaultActor 
import groovyx.gpars.actor.Actor 

class CsvReader extends DefaultActor { 
    void act() { 
     loop { 
      react { 
       reply readCsv() 
      } 
     } 
    } 
} 

class CsvProcessor extends DefaultActor { 
    Actor reader 
    void act() { 
     loop { 
      reader.send(null) 
      react { 
       if (it == null) 
        terminate() 
       else 
        processCsv(it) 
      } 
     } 
    } 
} 

def N_PROCESSORS = 10 
def reader = new CsvReader().start() 
(0..<N_PROCESSORS).collect { new CsvProcessor(reader: reader).start() }*.join() 
+0

Si suppone che in questo esempio la chiamata readCsv() restituisca una singola riga del CSV? Voglio solo assicurarmi che sto leggendo bene. – Scott

+0

Sì, 'readCsv()' leggerà ogni riga in sequenza. Quando viene raggiunta la fine del file, restituisce null, che consente ai processori di sapere che la fine è stata raggiunta e che devono 'terminate()'. – ataylor

Problemi correlati