2016-03-09 17 views
6

Stiamo aggiornando la nostra implementazione di kafka a 0,9 e utilizzando il nuovo consumer java api per creare consumer.Io sto usando il codice sotto per il consumatore e stiamo usando l'argomento setting al consumer come in LINEA A e LINEA B è la chiamata al nostro servizio che elabora i messaggi che riceviamo. Ora il problema è che stiamo ricevendo un'eccezione se l'elaborazione dei messaggi richiede più di 30 secondi.kafka upgrade a .9 con new consumer api

Properties props = new Properties(); 
      props.put("bootstrap.servers", "localhost:9092"); 
      props.put("group.id", "test-group"); 
      props.put("auto.offset.reset", "earliest"); 
      props.put("heartbeat.interval.ms", "1000"); 
      props.put("receive.buffer.bytes", 10485760); 
      props.put("fetch.message.max.bytes", 5242880); 
      props.put("enable.auto.commit", false); 
    //with partition assigned to consumer 


      KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props); 
      // TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0); 
      //consumer.assign(Arrays.asList(partition0)); 
      //assign topic to consumer without partition 
//LINE A 
      consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp()); 
      List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 
      while (true) { 
       try { 
        ConsumerRecords<Object, Object> records = consumer.poll(1000); 
        consumeFromQueue(records);//LINE B 
        consumer.commitSync(); 

       } catch (CommitFailedException e) { 
        e.printStackTrace(); 
        System.out.println("CommitFailedException"); 
       } catch (Exception e) { 
        e.printStackTrace(); 
        System.out.println("Exception in while consuming messages"); 
       } 

eccezione è

2016-03-03 10: 47: 35,095 INFO 6448 --- [ask-scheduler-3] o.a.k.c.c.internals.AbstractCoordinator: Marcatura il coordinatore 2147483647 morti. 2016-03-03 10: 47: 35.096 ERRORE 6448 --- [ask-scheduler-3] oakccinternals.ConsumerCoordinator: errore ILLEGAL_GENERATION si è verificato durante il commit degli offset per il gruppo TEST-GROUP CommitFailedException org.apache.kafka.clients. consumer.CommitFailedException: Commit non può essere completato a causa del riequilibrio di gruppo in org.apache.kafka.clients.consumer.internals.ConsumerCoordinator $ OffsetCommitResponseHandler.handle (ConsumerCoordinator.java:552) in org.apache.kafka.clients.consumer. internals.ConsumerCoordinator $ OffsetCommitResponseHandler.handle (ConsumerCoordinator.java:493) all'indirizzo org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:665) su org.apache.kafka.clients. consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess (AbstractCoordinator.java:644) su org.apache.kafka.clients.consumer.internals.RequestFuture $ 1.onSuccess (RequestFuture.java:167) su org.apache.kafka.clients.consumer.internals. RequestFuture.fireSuccess (RequestFuture.java:133) all'indirizzo org.apache.kafka.clients.consumer.internals.RequestFuture.complete (RequestFuture.java:107) all'indirizzo org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient $ RequestFutureCompletionHandler.onComplete (ConsumerNetworkClient.java:380) in org.apache.kafka.clients.NetworkClient.poll (NetworkClient.java:274) in org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll (ConsumerNetworkClient. java: 320) su org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetwo rkClient.java:213)

Sopra l'eccezione arriva mentre si commuta offset. Qualsiasi suggerimento può essere d'aiuto

risposta

6

Ciò accade perché il nuovo utente è a thread singolo e l'unico modo per mantenere l'heartbeat con il gruppo di utenti è il polling o il commit dell'offset, dopo 30 secondi il coordinatore di gruppo sta marcando il tuo consumatore come morto e richiede un riequilibrio di gruppo. Per questa situazione è possibile aumentare lo request.timeout.ms o dividere il lavoro di consumo e di elaborazione tra 2 thread.

+0

Grazie per la risposta, ho provato ad aggiungere "request.timeout.ms" a 70000, mi ha dato la stessa eccezione anche se l'elaborazione dei messaggi ha richiesto 30000. –

+0

Non sono sicuro del motivo per cui non ha alcun effetto. Ho persino provato a impostare "session.timeout.ms" su 70000 ma mi ha dato un'eccezione dicendo "org.apache.kafka.common.errors.ApiException: il timeout della sessione non rientra nell'intervallo accettabile." Stavo pensando al tuo secondo suggerimento, ma se qualcosa va storto nell'elaborazione dei thread come gestirlo? aggiungendolo nuovamente all'argomento come nuovo messaggio e ripristinando le modifiche causate dal messaggio prima dell'eccezione? –

+0

aumentare anche il group.max.session.timeout.ms – Nautilus

0

Si potrebbe limitare il numero di messaggi restituiti da sondaggio() impostando

max.partition.fetch.bytes 

ad una certa soglia adatto che è più grande del vostro più grande messaggio, ma così basso che si otterrà meno messaggi al sondaggio.

Kafka 0.10.x ha il supporto per limitare in modo esplicito il numero di messaggi restituiti al client impostando

max.poll.records