2011-10-11 13 views
34

Ho un file .csv contenente oltre 70 milioni di righe di cui ogni riga deve generare un eseguibile e quindi eseguito da threadpool. Questo Runnable inserirà un record in Mysql.Come riutilizzare un pool di thread dopo lo spegnimento

Cosa c'è di più, voglio registrare una posizione del file CSV per il RandomAccessFile da individuare. La posizione è scritta in un file . Desidero scrivere questo record quando tutti i thread in threadpool sono terminati. Viene richiamato ThreadPoolExecutor.shutdown(). Ma quando arrivano più linee, ho bisogno di un nuovo threadpool. Come posso riutilizzare questo threadpool corrente invece di crearne uno nuovo.

Il codice è il seguente:

public static boolean processPage() throws Exception { 

    long pos = getPosition(); 
    long start = System.currentTimeMillis(); 
    raf.seek(pos); 
    if(pos==0) 
     raf.readLine(); 
    for (int i = 0; i < PAGESIZE; i++) { 
     String lineStr = raf.readLine(); 
     if (lineStr == null) 
      return false; 
     String[] line = lineStr.split(","); 
     final ExperienceLogDO log = CsvExperienceLog.generateLog(line); 
     //System.out.println("userId: "+log.getUserId()%512); 

     pool.execute(new Runnable(){ 
      public void run(){ 
       try { 
        experienceService.insertExperienceLog(log); 
       } catch (BaseException e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 

     long end = System.currentTimeMillis(); 
    } 

    BufferedWriter resultWriter = new BufferedWriter(
      new OutputStreamWriter(new FileOutputStream(new File(
        RESULT_FILENAME), true))); 
    resultWriter.write("\n"); 
    resultWriter.write(String.valueOf(raf.getFilePointer())); 
    resultWriter.close(); 
    long time = System.currentTimeMillis()-start; 
    System.out.println(time); 
    return true; 
} 

Grazie!

risposta

31

Come indicato nella documentation, non è possibile riutilizzare un ExecutorService che è stato arrestato. Lo raccomanderei contro qualsiasi soluzioni alternative , poiché (a) potrebbero non funzionare come previsto in tutte le situazioni; e (b) puoi ottenere ciò che vuoi usando le classi standard.

Dovete o

  1. un'istanza di una nuova ExecutorService; oppure

  2. non terminare lo ExecutorService.

La prima soluzione è facilmente implementabile, quindi non la descriverò in dettaglio.

Per il secondo, dal momento che si desidera eseguire un'azione una volta completate tutte le attività inoltrate, è possibile dare un'occhiata a ExecutorCompletionService e utilizzarlo. Si avvolge una ExecutorService che farà la gestione dei thread, ma i runnables otterrà avvolto in qualcosa che racconterà la ExecutorCompletionService quando hanno finito, in modo che possa riferire a voi:

ExecutorService executor = ...; 
ExecutorCompletionService ecs = new ExecutorCompletionService(executor); 

for (int i = 0; i < totalTasks; i++) { 
    ... ecs.submit(...); ... 
} 

for (int i = 0; i < totalTasks; i++) { 
    ecs.take(); 
} 

Il metodo take() sul ExecutorCompletionService la classe si bloccherà fino al termine di un'attività (normalmente o bruscamente). Restituirà un Future, quindi puoi controllare i risultati se lo desideri.

Spero che questo possa aiutarti, dal momento che non ho compreso completamente il tuo problema.

+0

come posso forzare l'arresto dei thread o annullare tutte le attività? usando exectuer.shutdownNow()? funzionerà bene anche con questo wrapper? –

3

creare e raggruppare tutte le attività, e li trasmette alla piscina con invokeAll (che restituisce solo quando tutte le attività vengano completate con successo)

+0

Qual è l'approccio migliore per utilizzare 'ExecutorCompletionService' o' invokeAll'? –

1

Dopo aver chiamato lo spegnimento su un ExecutorService, no new Task will be accepted. Questo è necessario creare un nuovo ExecutorService per ogni round di attività.

Tuttavia, è stato introdotto Java 8 ForkJoinPool.awaitQuiescence. Se è possibile passare da un normale ExecutorService a ForkJoinPool, è possibile utilizzare questo metodo per attendere fino a quando non vengono eseguite altre attività in ForkJoinPool senza dover richiamare l'arresto. Ciò significa che puoi riempire un ForkJoinPool con Task, aspettando che sia vuoto (inattivo), e poi di nuovo iniziare a riempirlo con Attività, e così via.

Problemi correlati