2013-08-15 12 views
8

Utilizzando Primavera lotto 2.2.1, ho configurato un lavoro batch primavera, ho usato questo approccio:Primavera lotto: Tasklet con multi esecutore filettato ha pessime prestazioni legate alla Throttling algoritmo

configurazione è la seguente:

  • Tasklet utilizza ThreadPoolTaskExecutor limitata a 15 fili

  • strozzare-limite è pari al numero di fili

  • Chunk utilizzato con:

    • 1 adattatore sincronizzato di JdbcCursorItemReader per consentirne l'uso da molti fili secondo Spring documentazione batch recommandation

      È possibile sincronizzare la chiamata a read() e fintanto che l'elaborazione e la scrittura rappresentano la parte più costosa del blocco, il passaggio potrebbe comunque completare molto ter che in una configurazione a thread singolo.

    • saveState è falso su JdbcCursorItemReader

    • A Custom ItemWriter sulla base di JPA. Si noti che l'elaborazione di un elemento può variare in termini di tempo di elaborazione, può richiedere alcuni millisimi per alcuni secondi (> 60 secondi).

    • commit-intervallo impostato a 1 (lo so che potrebbe essere migliore, ma non è questo il problema)

  • Tutte le piscine JDBC vanno bene, per quanto riguarda primavera Lotto DOC recommandation

L'esecuzione del batch porta a risultati molto strani e cattivi a causa di:

  • a un certo punto, se gli elementi richiedono un po 'di tempo per l'elaborazione da parte di uno scrittore, quasi tutti i thread nel pool di thread non eseguono nulla invece di elaborare, solo il writer lento funziona.

Guardando Codice del lotto Primavera, causa principale sembra essere in questo pacchetto:

  • org/Spring Framework/batch/ripetizione/supporto/

È questo modo di lavorare una caratteristica o è una limitazione/bug?

Se si tratta di una funzione, in che modo la configurazione consente di rendere tutti i thread senza essere affamati da un lungo lavoro di elaborazione senza dover riscrivere tutto?

Si noti che se tutti gli elementi prendono allo stesso tempo, tutto funziona correttamente e il multi-threading è OK, ma se uno dei processi di elaborazione richiede molto più tempo, il multi-threading è quasi inutile per il tempo in cui il processo lento funziona .

Nota ho aperto questo problema:

risposta

5

Come ha detto Alex, sembra che questo comportamento è un contratto come da javadocs di:

sottoclassi solo bisogno di fornire un metodo che ottiene il risultato successivo * e uno che attende tutti i risultati essere tornato da processi simultanei * o thread

un'occhiata a:

TaskExecutorRepeatTemplate # waitForResults

Un'altra opzione per voi sarebbe quella di utilizzare Partitioning:

  • Un TaskExecutorPartitionHandler che eseguirà elementi da Partitionned ItemReader, vedi sotto
  • Un'implementazione di partizionamento che dà gli intervalli per essere trattati da ItemReader, vedere ColumnRangePartitioner sotto
  • Un CustomReader che leggerà i dati utilizzando ciò che il Partizionatore avrà riempito, vedere la configurazione di myItemReader sotto

Michael Minella spiega nel capitolo 11 del suo libro Pro Spring Batch:

<batch:job id="batchWithPartition"> 
    <batch:step id="step1.master"> 
     <batch:partition partitioner="myPartitioner" handler="partitionHandler"/> 
    </batch:step>  
</batch:job> 
<!-- This one will create Paritions of Number of lines/ Grid Size--> 
<bean id="myPartitioner" class="....ColumnRangePartitioner"/> 
<!-- This one will handle every partition in a Thread --> 
<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler"> 
    <property name="taskExecutor" ref="multiThreadedTaskExecutor"/> 
    <property name="step" ref="step1" /> 
    <property name="gridSize" value="10" /> 
</bean> 
<batch:step id="step1"> 
     <batch:tasklet transaction-manager="transactionManager"> 
      <batch:chunk reader="myItemReader" 
       writer="manipulatableWriterForTests" commit-interval="1" 
       skip-limit="30000"> 
       <batch:skippable-exception-classes> 
        <batch:include class="java.lang.Exception" /> 
       </batch:skippable-exception-classes> 
      </batch:chunk> 
     </batch:tasklet> 
</batch:step> 
<!-- scope step is critical here--> 
<bean id="myItemReader"  
         class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step"> 
    <property name="dataSource" ref="dataSource"/> 
    <property name="sql"> 
     <value> 
      <![CDATA[ 
       select * from customers where id >= ? and id <= ? 
      ]]> 
     </value> 
    </property> 
    <property name="preparedStatementSetter"> 
     <bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter"> 
      <property name="parameters"> 
       <list> 
<!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext--> 
        <value>{stepExecutionContext[minValue]}</value> 
        <value>#{stepExecutionContext[maxValue]}</value> 
       </list> 
      </property> 
     </bean> 
    </property> 
    <property name="rowMapper" ref="customerRowMapper"/> 
</bean> 

Partitioner.java:

package ...; 
    import java.util.HashMap; 
import java.util.Map; 
import org.springframework.batch.core.partition.support.Partitioner; 
import org.springframework.batch.item.ExecutionContext; 
public class ColumnRangePartitioner implements Partitioner { 
private String column; 
private String table; 
public Map<String, ExecutionContext> partition(int gridSize) { 
    int min = queryForInt("SELECT MIN(" + column + ") from " + table); 
    int max = queryForInt("SELECT MAX(" + column + ") from " + table); 
    int targetSize = (max - min)/gridSize; 
    System.out.println("Our partition size will be " + targetSize); 
    System.out.println("We will have " + gridSize + " partitions"); 
    Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); 
    int number = 0; 
    int start = min; 
    int end = start + targetSize - 1; 
    while (start <= max) { 
     ExecutionContext value = new ExecutionContext(); 
     result.put("partition" + number, value); 
     if (end >= max) { 
      end = max; 
     } 
     value.putInt("minValue", start); 
     value.putInt("maxValue", end); 
     System.out.println("minValue = " + start); 
     System.out.println("maxValue = " + end); 
     start += targetSize; 
     end += targetSize; 
     number++; 
    } 
    System.out.println("We are returning " + result.size() + " partitions"); 
    return result; 
} 
public void setColumn(String column) { 
    this.column = column; 
} 
public void setTable(String table) { 
    this.table = table; 
} 
} 
+0

Questo post è ottimo perché mostra l'esempio xml per configurare un passo partizionato, molto utile se non si ha a portata di mano il libro! – EdgeCaseBerg

3

Ecco cosa penso sta succedendo:

  • Come hai detto, il vostro ThreadPoolTaskExecutor è limitata a 15 discussioni
  • Il "blocco" del framework sta causando ogni elemento in JdbcCursorItemReader (fino al thread lim it) da eseguire in un thread diverso
  • Ma il framework Spring Batch attende anche che ciascuno dei thread (ovvero tutti i 15) completi il ​​proprio flusso di lettura/elaborazione/scrittura prima di passare al blocco successivo, dato il proprio intervallo di commit di 1. In alcune circostanze, ciò causa che 14 thread attendono quasi 60 secondi su un thread di pari livello che richiede sempre un completamento.

In altre parole, affinché questo approccio multi-thread in Spring Batch sia utile, ogni thread deve essere elaborato in circa lo stesso tempo. Dato il tuo scenario in cui c'è un'enorme disparità tra il tempo di elaborazione di alcuni articoli, stai riscontrando una limitazione in cui molti dei tuoi thread sono completi e in attesa su un thread gemello di lunga durata per essere in grado di passare al prossimo pezzo di elaborazione.

Il mio suggerimento:

  • In generale, direi che aumentare il vostro intervallo di commit dovrebbe aiutare un po ', dal momento che dovrebbe consentire più di un elemento del cursore da elaborare in un singolo thread in mezzo impegna anche se uno dei thread è bloccato su una scrittura di lunga durata. Tuttavia, se sei sfortunato, potrebbero verificarsi più transazioni lunghe nello stesso thread e peggiorare le cose (ad es. 120 secondi tra commit in un singolo thread per un intervallo di commit di 2).
  • In particolare, suggerirei di aumentare le dimensioni del pool di thread su un numero elevato, superando anche le connessioni massime del database di 2x o 3x. Ciò che dovrebbe accadere è che anche se alcuni dei thread bloccano il tentativo di acquisire una connessione (a causa delle dimensioni del pool di thread di grandi dimensioni), si vedrà effettivamente un aumento della velocità effettiva poiché i thread di lunga esecuzione non interromperanno più altri thread prendendo nuovi elementi dal cursore e continuando il lavoro del lavoro batch nel frattempo (all'inizio di un blocco, il numero di thread in attesa supererà di molto il numero di connessioni database disponibili. Quindi lo scheduler del sistema operativo si agiterà un po 'mentre attiva i thread che sono bloccati sull'acquisizione di una connessione al database e devono disattivare il thread.Tuttavia, dal momento che la maggior parte dei tuoi thread completerà il loro lavoro e rilascerà la connessione al database in modo relativamente veloce, dovresti vedere che nel complesso il tuo throughput è migliorato man mano che molti thread continuano ad acquisire connessioni al database , lavorando, rilasciando connessioni al database e consentendo a ulteriori thread di fare lo stesso anche mentre i thread di vecchia data stanno facendo il loro dovere) .
+0

Grazie per la risposta, potresti indicarmi il codice o la documentazione che dice "Il framework Spring Batch è in attesa anche di ciascuno dei thread (cioè di tutti i 15) per completare il flusso di lettura/elaborazione/scrittura individuale prima di passare al successivo pezzo "? Mi piace la tua idea di workaround, grazie. Ma diresti che è un abuso o SpringBatch o un problema in Spring Batch? – pmpm

+0

Un seguito alla mia risposta precedente - se si prova ad espandere il proprio pool di thread nel modo in cui ho suggerito si potrebbe iniziare a ottenere un sacco di transazioni di database senza commit dal momento che ogni thread elaborerà un blocco tra commit. Solo qualcosa da considerare come un compromesso. – Alex

1

Nel mio caso, se io don 't impostare il limite del throttle, quindi solo 4 thread entrano in read() metodo di ItemReader che è anche il numero predefinito di thread, se non specificato nel tag tasklet come da documentazione Spring Batch.

Se specifico più fili pe 10 o 20 o 100, allora solo 8 fili entrano metodo read() di ItemReader

+0

Hai ottenuto una soluzione solo per 8 thread? O lo stai ancora utilizzando con 8 thread? – Ani

1

Il limite di 8 thread attivi indipendentemente dal valore della farfalla limite potrebbe essere causato da contesa sul repository di Spring Batch Job. Ogni volta che viene elaborato un blocco, alcune informazioni vengono scritte nel repository di lavoro. Aumentare le dimensioni del pool per adattarsi al numero di thread necessari!

+0

Come si aumenta la "dimensione del pool" del repository di Spring Batch Job? AFAICT il codice in TaskletStep si blocca su un semaforo con un solo permesso, prima di aggiornare il JobRepository. –

Problemi correlati