2016-01-20 19 views
8

Sto scrivendo un utente che commette manualmente lo scostamento una volta che una serie di record è stata assegnata a Mongo.
Nel caso di un errore Mongo o di un altro errore, viene effettuato un tentativo di persistenza del record in un errore durante l'elaborazione della raccolta per la riproduzione in un secondo momento. Se Mongo non funziona, desidero che il consumatore interrompa l'elaborazione per un periodo di tempo prima di provare a leggere i record dall'offset non salvato di Kakfa.
Il seguente esempio funziona, ma mi piacerebbe sapere qual è la migliore pratica per questo scenario?Kafka 0.9 Come riutilizzare il messaggio quando si esegue manualmente l'offset con un KafkaConsumer

while (true) { 
    boolean commit = false; 
    try { 
     ConsumerRecords<K, V> records = consumer.poll(consumerTimeout); 
     kafkaMessageProcessor.processRecords(records); 
     commit = true; 
    } 
    catch (Exception e) { 
     logger.error("Unable to consume closing consumer and restarting", e); 
     try { 
      consumer.close(); 
     } 
     catch (Exception consumerCloseError) { 
      logger.error("Unable to close consumer", consumerCloseError); 
     } 
     logger.error(String.format("Attempting recovery in [%d] milliseconds.", recoveryInterval), e); 
     Thread.sleep(recoveryInterval); 
     consumer = createConsumer(properties); 
    } 
    if (commit) { 
     consumer.commitSync(); 
    } 

} 

private KafkaConsumer<K, V> createConsumer(Properties properties) { 
    KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties); 
    consumer.subscribe(topics); 
    return consumer; 
} 

Se non ricreare il consumatore ottengo il seguente errore.

o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483647 dead. 
o.a.k.c.c.internals.ConsumerCoordinator : Error ILLEGAL_GENERATION occurred while committing offsets for group test.consumer 

risposta

5

Se non ha commesso l'offset e la proprietà auto.commit.enable è FALSE poi, quando la chiamata a Mongo non solo attendere il tempo che si pensa è necessario e riprovare per eseguire il polling().

Il problema che state vedendo è che il nuovo consumatore utilizza il poll() come meccanismo di battito cardiaco, per cui se si attende più a lungo che la richiesta di timeout poi il coordinatore per l'argomento sarà kickout il consumatore, perché penserà è morto e riequilibrerà il gruppo. Quindi aspettate il mongo ma potreste voler eseguire il poll() per un po '.

EDIT: Come soluzione alternativa si può mettere questa proprietà request.timeout.ms superiori

Speranza che aiuta!

+0

Grazie per il vostro aiuto. Questo ha risolto il secondo problema del mio consumatore che ha ottenuto il boot. Al fine di rielaborare il messaggio invece di ricreare il consumatore consumer.seekToBeginning() può essere chiamato invece. –

+0

consumer.seekToBeginning (partizioni) ripristinerà l'offset nella prima posizione in tutte le partizioni inviate. Non vedo come questo aiuti nel tuo caso d'uso, se resetti al mendicante dovrai rielaborare tutti gli eventi. – Nautilus

+0

Riprenderà tutti gli eventi dall'ultimo commit di offset. Questa supposizione è errata?Voglio continuare a tentare di rielaborare fino a quando Mongo sarà di nuovo disponibile. Senza questo il sondaggio consuma solo il prossimo messaggio. –

1

a quanto ho capito, il (nuovo) client è quello che mantiene gli offset consumati. Il commit invia gli offset al server, ma non ha alcun effetto sul prossimo sondaggio da quel client, dal momento che il client dice al server "dammi i prossimi messaggi su THAT offset". Perché l'offset viene inviato al server? Per il prossimo ribilanciamento. Quindi l'unico server della situazione utilizza gli offset commessi quando un client muore/disconnette, quindi le partizioni vengono ribilanciate e con questo riequilibrio i client ottengono gli offset dal server.

Quindi, se non si esegue il commit dell'offset e quindi si chiama poll(), non è possibile aspettarsi che il messaggio venga nuovamente letto. Per questo ci dovrebbe essere la possibilità di rollback dell'offset nel client. Non ho provato, ma penso che chiamare KafkaConsumer.seek all'offset del messaggio fallito dovrebbe fare il trucco.

https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek(org.apache.kafka.common.TopicPartition,%20long)

BTW, in questo modo si può anche commettere l'ultimo messaggio con successo elaborati e cercare di primo fallito, in modo che non è necessario ripetere l'intero elenco di record, quando il fallimento si è verificato per qualche messaggio nel mezzo di esso.

6

Ecco il mio codice utilizzando la versione 0.10.0 del client.

Sembra ok per la tua richiesta.

import java.util.Collections; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Properties; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicBoolean; 

import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.clients.consumer.OffsetAndMetadata; 
import org.apache.kafka.clients.consumer.OffsetCommitCallback; 
import org.apache.kafka.common.TopicPartition; 
import org.apache.kafka.common.serialization.StringDeserializer; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

public class MessageProcesser implements Runnable { 

    private static Logger logger = LoggerFactory.getLogger(MessageProcesser.class); 

    private final ExecutorService pool = Executors.newFixedThreadPool(4); 

    private final KafkaConsumer<String, String> consumer; 

    private final String topic; 

    private final AtomicBoolean closed = new AtomicBoolean(false); 

    public MessageProcesser(String groupId, String topic, String kafkaServer) { 
     this.topic = topic; 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", kafkaServer); 
     props.put("group.id", groupId); 
     props.put("key.deserializer", StringDeserializer.class.getName()); 
     props.put("value.deserializer", StringDeserializer.class.getName()); 
     props.put("enable.auto.commit", "false"); 
     this.consumer = new KafkaConsumer<>(props); 
    } 

    @Override 
    public void run() { 
     try { 

      consumer.subscribe(Collections.singleton(topic)); 

      while (true) { 
       if (closed.get()) { 
        consumer.close(); 
       } 

       ConsumerRecords<String, String> records = consumer.poll(1000 * 60); 
       for (ConsumerRecord<String, String> record : records) { 

        String value = record.value(); 
        if (null == value) { 
         continue; 
        } 

        boolean processResult = false; 
        try { 
         Future<Object> f = pool.submit(new ProcessCommand(value)); 
         processResult = (boolean) f.get(100, TimeUnit.MILLISECONDS); 
        } catch (Exception e) { 
         logger.error(e.getMessage(), e); 
        } 

        if (!processResult) { 
         //here if process fail, seek to current offset 
         consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset()); 
        } else { 
         this.commitAsyncOffset(record); 
        } 
       } 
      } 
     } catch (Exception e) { 
      logger.error(e.getMessage(), e); 
      if (!closed.get()) { 
       try { 
        Thread.sleep(100); 
       } catch (InterruptedException e1) { 
        // ignore 
       } 
      } 
     } 
    } 

    public void shutdown() { 
     closed.set(true); 
    } 

    public void commitAsyncOffset(ConsumerRecord<String, String> record) { 
     Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); 
     offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); 

     consumer.commitAsync(offsets, new OffsetCommitCallback() { 
      @Override 
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) { 
       if (e != null) { 
        logger.error("kafka offset commit fail. {} {}", offsets, PushUtil.getStackString(e.getStackTrace())); 
       } 
      } 
     }); 
    } 
} 
+0

Signore, la ricerca è necessaria? –

+0

sì, cercare è nesscessory. Il client Java ricorda l'attuale offset. – Hlex

+0

Questo codice ha problemi se i record sono multipli e si verifica un errore. non si dovrebbe elaborare il record successivo con la stessa partizione. – Hlex

Problemi correlati