2013-06-04 15 views
5

Desidero utilizzare l'API bulk di elasticsearch utilizzando java e chiedendomi come impostare la dimensione del batch.elasticsearch java bulk batch size

Attualmente sto usando come:

BulkRequestBuilder bulkRequest = getClient().prepareBulk(); 
while(hasMore) { 
    bulkRequest.add(getClient().prepareIndex(indexName, indexType, artist.getDocId()).setSource(json)); 
    hasMore = checkHasMore(); 
} 
BulkResponse bResp = bulkRequest.execute().actionGet(); 
//To check failures 
log.info("Has failures? {}", bResp.hasFailures()); 

Qualsiasi idea di come posso impostare la dimensione di massa/batch?

+1

prega, contrassegnare la risposta come corretta ..... –

risposta

21

Dipende principalmente dalla dimensione dei documenti, dalle risorse disponibili sul client e dal tipo di client (client di trasporto o client nodo).

Il client del nodo è a conoscenza dei frammenti sul cluster e invia i documenti direttamente ai nodi che contengono i frammenti dove dovrebbero essere indicizzati. D'altra parte il client di trasporto è un normale client che invia le sue richieste a un elenco di nodi in modalità round robin. La richiesta di massa verrà quindi inviata a un nodo, che diventerà il gateway durante l'indicizzazione.

Poiché si utilizza l'API Java, suggerisco di dare un'occhiata allo BulkProcessor, il che rende molto più facile e flessibile l'indicizzazione in blocco. È possibile definire un numero massimo di azioni, una dimensione massima e un intervallo di tempo massimo dall'ultima esecuzione di massa. Sta per eseguire automaticamente il carico automaticamente quando necessario. È anche possibile impostare un numero massimo di richieste simultanee in blocco.

Dopo aver creato il BulkProcessor in questo modo:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { 
    @Override 
    public void beforeBulk(long executionId, BulkRequest request) { 
     logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions()); 
    } 

    @Override 
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 
     logger.info("Executed bulk composed of {} actions", request.numberOfActions()); 
    } 

    @Override 
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 
     logger.warn("Error executing bulk", failure); 
    } 
    }).setBulkActions(bulkSize).setConcurrentRequests(maxConcurrentBulk).build(); 

Non vi resta che aggiungere le vostre richieste ad esso:

bulkProcessor.add(indexRequest); 

e chiuderlo alla fine per irrigare eventuali richieste che potrebbero avere non ancora eseguito:

bulkProcessor.close(); 

Per rispondere finalmente alla tua domanda: th La cosa bella del BulkProcessor è anche che ha delle impostazioni predefinite: 5 MB di dimensioni, 1000 azioni, 1 richiesta simultanea, nessun intervallo di risciacquo (che potrebbe essere utile impostare).

0

è necessario contare il generatore di richieste in blocco quando raggiunge il limite di dimensione del batch, indicizzarlo e svuotare le versioni di massa più vecchie. qui è esempio di codice

Settings settings = ImmutableSettings.settingsBuilder() 
    .put("cluster.name", "MyClusterName").build(); 

TransportClient client = new TransportClient(settings); 
String hostname = "myhost ip"; 
int port = 9300; 
client.addTransportAddress(new InetSocketTransportAddress(hostname, port)); 

BulkRequestBuilder bulkBuilder = client.prepareBulk(); 
BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path")))); 
long bulkBuilderLength = 0; 
String readLine = ""; 
String index = "my_index_name"; 
String type = "my_type_name"; 
String id = ""; 

while((readLine = br.readLine()) != null){ 
    id = somefunction(readLine); 
    String json = new ObjectMapper().writeValueAsString(readLine); 
    bulkBuilder.add(client.prepareIndex(index, type, id).setSource(json)); 
    bulkBuilderLength++; 
    if(bulkBuilderLength % 1000== 0){ 
     logger.info("##### " + bulkBuilderLength + " data indexed."); 
     BulkResponse bulkRes = bulkBuilder.execute().actionGet(); 
     if(bulkRes.hasFailures()){ 
     logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); 
     } 
     bulkBuilder = client.prepareBulk(); 
    } 
} 

br.close(); 

if(bulkBuilder.numberOfActions() > 0){ 
    logger.info("##### " + bulkBuilderLength + " data indexed."); 
    BulkResponse bulkRes = bulkBuilder.execute().actionGet(); 
    if(bulkRes.hasFailures()){ 
     logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); 
    } 
    bulkBuilder = client.prepareBulk(); 
} 

speranza che questo ti aiuta grazie