2016-03-03 21 views
6

L'API consumer di alto livello sembra leggere un messaggio alla volta.Kafka ha un consumatore in batch?

Questo potrebbe essere piuttosto problematico per i consumatori se vogliono elaborare e inviare tali messaggi ad altri consumatori a valle come Solr o Elastic-Search perché preferiscono avere messaggi in blocco piuttosto che uno alla volta.

Non è banale archiviare questi messaggi anche in memoria, perché gli offset in Kafka dovranno anche essere sincronizzati solo quando il batch è già impegnato, altrimenti un kafka-consumatore arrestato con messaggi downstream non inviati (come in Solr o ES) avrà i suoi offset già aggiornati e quindi messaggi allentati.

Il consumatore può consumare messaggi più volte se si blocca dopo aver inviato messaggi a valle ma prima di aggiornare gli offset dei messaggi.

Se Kafka consuma messaggi in batch, alcuni suggerimenti sul codice/documentazione saranno molto apprezzati.

Grazie!

+0

Quale versione di Kafka sono stai chiedendo? Presumo che se stai parlando di High Level Consumer è 0.8.2 o precedente. – morganw09dev

risposta

3

Non sono a conoscenza di un utente di lotto. Ma anche se c'è uno il tuo problema principale rimane. Si desidera impegnare l'offset dopo aver inoltrato correttamente i dati. Un modo per ottenere questo è disattivare il commit automatico del consumatore impostando la proprietà auto.commit.enable = false. Il compromesso è, naturalmente, che devi prenderti cura di quando commettere i tuoi offset.

Trovate una documentazione completa delle proprietà di consumo qui: https://kafka.apache.org/documentation.html#consumerconfigs

Un bel esempio di come manualy commettere l'offset rubato dalla java-doc (https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html):

Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("group.id", "test"); 
props.put("enable.auto.commit", "false"); 
props.put("auto.commit.interval.ms", "1000"); 
props.put("session.timeout.ms", "30000"); 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
consumer.subscribe(Arrays.asList("foo", "bar")); 
final int minBatchSize = 200; 
List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 
while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(100); 
    for (ConsumerRecord<String, String> record : records) { 
     buffer.add(record); 
    } 
    if (buffer.size() >= minBatchSize) { 
     insertIntoDb(buffer); 
     consumer.commitSync(); 
     buffer.clear(); 
    } 
} 
+0

Sono d'accordo con la tua spiegazione del commit automatico. Ma per quanto riguarda il tuo codice, ConsumerRecord è una classe Kafka 0.9, mentre la sua domanda fa sembrare che stia chiedendo ai consumatori di pre 0.9. Sebbene non lo dichiari esplicitamente. – morganw09dev

+0

Questo è il problema con il codice precedente. – user2250246

+1

Se il consumatore si blocca prima di commettere gli offset, i messaggi verranno riprodotti. Non ho l'equivalente DB di beginTransaction() e endTransaction(). – user2250246

Problemi correlati