Sto scrivendo un utente che commette manualmente lo scostamento una volta che una serie di record è stata assegnata a Mongo.
Nel caso di un errore Mongo o di un altro errore, viene effettuato un tentativo di persistenza del record in un errore durante l'elaborazione della raccolta per la riproduzione in un secondo momento. Se Mongo non funziona, desidero che il consumatore interrompa l'elaborazione per un periodo di tempo prima di provare a leggere i record dall'offset non salvato di Kakfa.
Il seguente esempio funziona, ma mi piacerebbe sapere qual è la migliore pratica per questo scenario?Kafka 0.9 Come riutilizzare il messaggio quando si esegue manualmente l'offset con un KafkaConsumer
while (true) {
boolean commit = false;
try {
ConsumerRecords<K, V> records = consumer.poll(consumerTimeout);
kafkaMessageProcessor.processRecords(records);
commit = true;
}
catch (Exception e) {
logger.error("Unable to consume closing consumer and restarting", e);
try {
consumer.close();
}
catch (Exception consumerCloseError) {
logger.error("Unable to close consumer", consumerCloseError);
}
logger.error(String.format("Attempting recovery in [%d] milliseconds.", recoveryInterval), e);
Thread.sleep(recoveryInterval);
consumer = createConsumer(properties);
}
if (commit) {
consumer.commitSync();
}
}
private KafkaConsumer<K, V> createConsumer(Properties properties) {
KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties);
consumer.subscribe(topics);
return consumer;
}
Se non ricreare il consumatore ottengo il seguente errore.
o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483647 dead.
o.a.k.c.c.internals.ConsumerCoordinator : Error ILLEGAL_GENERATION occurred while committing offsets for group test.consumer
Grazie per il vostro aiuto. Questo ha risolto il secondo problema del mio consumatore che ha ottenuto il boot. Al fine di rielaborare il messaggio invece di ricreare il consumatore consumer.seekToBeginning() può essere chiamato invece. –
consumer.seekToBeginning (partizioni) ripristinerà l'offset nella prima posizione in tutte le partizioni inviate. Non vedo come questo aiuti nel tuo caso d'uso, se resetti al mendicante dovrai rielaborare tutti gli eventi. – Nautilus
Riprenderà tutti gli eventi dall'ultimo commit di offset. Questa supposizione è errata?Voglio continuare a tentare di rielaborare fino a quando Mongo sarà di nuovo disponibile. Senza questo il sondaggio consuma solo il prossimo messaggio. –