Stiamo aggiornando la nostra implementazione di kafka a 0,9 e utilizzando il nuovo consumer java api per creare consumer.Io sto usando il codice sotto per il consumatore e stiamo usando l'argomento setting al consumer come in LINEA A e LINEA B è la chiamata al nostro servizio che elabora i messaggi che riceviamo. Ora il problema è che stiamo ricevendo un'eccezione se l'elaborazione dei messaggi richiede più di 30 secondi.kafka upgrade a .9 con new consumer api
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "1000");
props.put("receive.buffer.bytes", 10485760);
props.put("fetch.message.max.bytes", 5242880);
props.put("enable.auto.commit", false);
//with partition assigned to consumer
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
// TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0);
//consumer.assign(Arrays.asList(partition0));
//assign topic to consumer without partition
//LINE A
consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp());
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
try {
ConsumerRecords<Object, Object> records = consumer.poll(1000);
consumeFromQueue(records);//LINE B
consumer.commitSync();
} catch (CommitFailedException e) {
e.printStackTrace();
System.out.println("CommitFailedException");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Exception in while consuming messages");
}
eccezione è
2016-03-03 10: 47: 35,095 INFO 6448 --- [ask-scheduler-3] o.a.k.c.c.internals.AbstractCoordinator: Marcatura il coordinatore 2147483647 morti. 2016-03-03 10: 47: 35.096 ERRORE 6448 --- [ask-scheduler-3] oakccinternals.ConsumerCoordinator: errore ILLEGAL_GENERATION si è verificato durante il commit degli offset per il gruppo TEST-GROUP CommitFailedException org.apache.kafka.clients. consumer.CommitFailedException: Commit non può essere completato a causa del riequilibrio di gruppo in org.apache.kafka.clients.consumer.internals.ConsumerCoordinator $ OffsetCommitResponseHandler.handle (ConsumerCoordinator.java:552) in org.apache.kafka.clients.consumer. internals.ConsumerCoordinator $ OffsetCommitResponseHandler.handle (ConsumerCoordinator.java:493) all'indirizzo org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:665) su org.apache.kafka.clients. consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:644) su org.apache.kafka.clients.consumer.internals.RequestFuture $ 1.onSuccess (RequestFuture.java:167) su org.apache.kafka.clients.consumer.internals. RequestFuture.fireSuccess (RequestFuture.java:133) all'indirizzo org.apache.kafka.clients.consumer.internals.RequestFuture.complete (RequestFuture.java:107) all'indirizzo org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient $ RequestFutureCompletionHandler.onComplete (ConsumerNetworkClient.java:380) in org.apache.kafka.clients.NetworkClient.poll (NetworkClient.java:274) in org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll (ConsumerNetworkClient. java: 320) su org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetwo rkClient.java:213)
Sopra l'eccezione arriva mentre si commuta offset. Qualsiasi suggerimento può essere d'aiuto
Grazie per la risposta, ho provato ad aggiungere "request.timeout.ms" a 70000, mi ha dato la stessa eccezione anche se l'elaborazione dei messaggi ha richiesto 30000. –
Non sono sicuro del motivo per cui non ha alcun effetto. Ho persino provato a impostare "session.timeout.ms" su 70000 ma mi ha dato un'eccezione dicendo "org.apache.kafka.common.errors.ApiException: il timeout della sessione non rientra nell'intervallo accettabile." Stavo pensando al tuo secondo suggerimento, ma se qualcosa va storto nell'elaborazione dei thread come gestirlo? aggiungendolo nuovamente all'argomento come nuovo messaggio e ripristinando le modifiche causate dal messaggio prima dell'eccezione? –
aumentare anche il group.max.session.timeout.ms – Nautilus