2015-06-17 17 views
20

Sono un nuovo studente che studia Kafka e mi sono imbattuto in alcuni problemi fondamentali con la comprensione di più consumatori che articoli, documentazioni, ecc. Non sono stati troppo utili finora.Come utilizzo più utenti in Kafka?

Una cosa che ho cercato di fare è scrivere il mio produttore e consumatore Kafka di alto livello e gestirli simultaneamente, pubblicando 100 semplici messaggi su un argomento e chiedendoli al mio consumatore di recuperarli. Sono riuscito a farlo con successo, ma quando provo a introdurre un secondo consumatore a consumare dallo stesso argomento in cui i messaggi sono stati appena pubblicati, non riceve alcun messaggio.

Era a mia conoscenza che per ogni argomento potevi avere consumatori da gruppi separati di consumatori e ognuno di questi gruppi di consumatori avrebbe ricevuto una copia completa dei messaggi prodotti su qualche argomento. È corretto? In caso contrario, quale sarebbe il modo corretto per me di creare più consumatori? Questa è la classe di consumatori che ho scritto finora:

public class AlternateConsumer extends Thread { 
    private final KafkaConsumer<Integer, String> consumer; 
    private final String topic; 
    private final Boolean isAsync = false; 

    public AlternateConsumer(String topic, String consumerGroup) { 
     Properties properties = new Properties(); 
     properties.put("bootstrap.servers", "localhost:9092"); 
     properties.put("group.id", consumerGroup); 
     properties.put("partition.assignment.strategy", "roundrobin"); 
     properties.put("enable.auto.commit", "true"); 
     properties.put("auto.commit.interval.ms", "1000"); 
     properties.put("session.timeout.ms", "30000"); 
     properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); 
     properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     consumer = new KafkaConsumer<Integer, String>(properties); 
     consumer.subscribe(topic); 
     this.topic = topic; 
    } 


    public void run() { 
     while (true) { 
      ConsumerRecords<Integer, String> records = consumer.poll(0); 
      for (ConsumerRecord<Integer, String> record : records) { 
       System.out.println("We received message: " + record.value() + " from topic: " + record.topic()); 
      } 
     } 

    } 
} 

Inoltre, ho notato che in origine stavo testando il consumo al di sopra di un 'test' argomento con una sola partizione. Quando ho aggiunto un altro consumatore a un gruppo di consumatori esistente che diceva "testGroup", questo ha innescato un riequilibrio di Kafka che ha rallentato la latenza del mio consumo di una quantità significativa, nell'ordine dei secondi. Ho pensato che si trattasse di un problema di ribilanciamento poiché avevo un'unica partizione, ma quando ho creato un nuovo argomento "multiplepartitions" con 6 partizioni, sono emersi problemi simili in cui l'aggiunta di più utenti allo stesso gruppo di consumatori ha causato problemi di latenza. Mi sono guardato intorno e la gente mi sta dicendo che dovrei usare un consumatore multi-thread: qualcuno può far luce su questo?

+0

C'è un grande esempio di un consumatore di alto livello [qui] (https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example) per kafka '0.8.1'. – chrsblck

+0

@chrsblck grazie per il collegamento.L'ho già esaminato in precedenza e probabilmente non l'ho capito come avrei potuto - potresti forse spiegare un po 'come quell'esempio fa uso dei thread? Al momento non capisco cosa stiano facendo. –

+0

Un modo è quello di avere lo stesso numero di thread delle partizioni per un determinato argomento. Dall'articolo - Grab una lista di stream 'List > stream = consumerMap.get (topic);' ... Quindi assegna ad ogni thread una partizione 'executor.submit (new ConsumerTest (stream, threadNumber)) '. – chrsblck

risposta

17

Penso che il problema si trovi con la proprietà auto.offset.reset. Quando un nuovo utente legge da una partizione e non vi è alcun offset commesso precedente, la proprietà auto.offset.reset viene utilizzata per decidere quale deve essere l'offset iniziale. Se lo si imposta su "più grande" (impostazione predefinita), si inizia a leggere all'ultimo messaggio (ultimo). Se lo si imposta su "più piccolo" si ottiene il primo messaggio disponibile.

Quindi aggiungere:

properties.put("auto.offset.reset", "smallest"); 

e riprovare.

+1

Questa è una risposta tardiva, ma grazie Chris! Le tue soluzioni sono corrette e dopo aver esaminato più da vicino alcuni documenti avrei dovuto notare che all'avvio di un nuovo consumatore è impostato per consumare solo i messaggi inviati più recenti - NON preesistenti a meno che non siano impostate le proprietà precedenti. –

4

Nella documentazione here si dice: "se si forniscono più thread di quante sono le partizioni sull'argomento, alcuni thread non vedranno mai un messaggio". Puoi aggiungere partizioni al tuo argomento? Ho il mio numero di thread del gruppo di consumatori uguale al numero di partizioni nel mio argomento e ogni thread riceve i messaggi.

Ecco il mio argomento config:

buffalo-macbook10:kafka_2.10-0.8.2.1 aakture$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic recent-wins 
Topic:recent-wins PartitionCount:3 ReplicationFactor:1 Configs: 
Topic: recent-wins Partition: 0 Leader: 0 Replicas: 0 Isr: 0 
Topic: recent-wins Partition: 1 Leader: 0 Replicas: 0 Isr: 0 
Topic: recent-wins Partition: 2 Leader: 0 Replicas: 0 Isr: 0 

E il mio consumatore:

package com.cie.dispatcher.services; 

import com.cie.dispatcher.model.WinNotification; 
import com.fasterxml.jackson.databind.ObjectMapper; 
import com.google.inject.Inject; 
import io.dropwizard.lifecycle.Managed; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

/** 
* This will create three threads, assign them to a "group" and listen for notifications on a topic. 
* Current setup is to have three partitions in Kafka, so we need a thread per partition (as recommended by 
* the kafka folks). This implements the dropwizard Managed interface, so it can be started and stopped by the 
* lifecycle manager in dropwizard. 
* <p/> 
* Created by aakture on 6/15/15. 
*/ 
public class KafkaTopicListener implements Managed { 
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicListener.class); 
private final ConsumerConnector consumer; 
private final String topic; 
private ExecutorService executor; 
private int threadCount; 
private WinNotificationWorkflow winNotificationWorkflow; 
private ObjectMapper objectMapper; 

@Inject 
public KafkaTopicListener(String a_zookeeper, 
          String a_groupId, String a_topic, 
          int threadCount, 
          WinNotificationWorkflow winNotificationWorkflow, 
          ObjectMapper objectMapper) { 
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
      createConsumerConfig(a_zookeeper, a_groupId)); 
    this.topic = a_topic; 
    this.threadCount = threadCount; 
    this.winNotificationWorkflow = winNotificationWorkflow; 
    this.objectMapper = objectMapper; 
} 

/** 
* Creates the config for a connection 
* 
* @param zookeeper the host:port for zookeeper, "localhost:2181" for example. 
* @param groupId the group id to use for the consumer group. Can be anything, it's used by kafka to organize the consumer threads. 
* @return the config props 
*/ 
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) { 
    Properties props = new Properties(); 
    props.put("zookeeper.connect", zookeeper); 
    props.put("group.id", groupId); 
    props.put("zookeeper.session.timeout.ms", "400"); 
    props.put("zookeeper.sync.time.ms", "200"); 
    props.put("auto.commit.interval.ms", "1000"); 

    return new ConsumerConfig(props); 
} 

public void stop() { 
    if (consumer != null) consumer.shutdown(); 
    if (executor != null) executor.shutdown(); 
    try { 
     if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { 
      LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); 
     } 
    } catch (InterruptedException e) { 
     LOG.info("Interrupted during shutdown, exiting uncleanly"); 
    } 
    LOG.info("{} shutdown successfully", this.getClass().getName()); 
} 
/** 
* Starts the listener 
*/ 
public void start() { 
    Map<String, Integer> topicCountMap = new HashMap<>(); 
    topicCountMap.put(topic, new Integer(threadCount)); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 
    executor = Executors.newFixedThreadPool(threadCount); 
    int threadNumber = 0; 
    for (final KafkaStream stream : streams) { 
     executor.submit(new ListenerThread(stream, threadNumber)); 
     threadNumber++; 
    } 
} 

private class ListenerThread implements Runnable { 
    private KafkaStream m_stream; 
    private int m_threadNumber; 

    public ListenerThread(KafkaStream a_stream, int a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run() { 
     try { 
      String message = null; 
      LOG.info("started listener thread: {}", m_threadNumber); 
      ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
      while (it.hasNext()) { 
       try { 
        message = new String(it.next().message()); 
        LOG.info("receive message by " + m_threadNumber + " : " + message); 
        WinNotification winNotification = objectMapper.readValue(message, WinNotification.class); 
        winNotificationWorkflow.process(winNotification); 
       } catch (Exception ex) { 
        LOG.error("error processing queue for message: " + message, ex); 
       } 
      } 
      LOG.info("Shutting down listener thread: " + m_threadNumber); 
     } catch (Exception ex) { 
      LOG.error("error:", ex); 
     } 
    } 
    } 
} 
+0

Puoi condividere l'esempio per la versione 1.0 di Kafka, poiché la maggior parte delle classi utilizzate nell'esempio sopra sono deprecate. –

+0

Non credo che fosse fuori in quel momento, potrei non riuscire ad aggiornare il mio codice molto presto, scuse. –

4

Se si desidera che più i consumatori a consumare stessi messaggi (come una trasmissione), è possibile li deporre le uova con diversi gruppo di consumatori e anche settando auto.offset.reset sul più piccolo nella configurazione del consumatore. Se desideri che più consumatori finiscano di consumare in parallelo (dividi il lavoro tra di loro), devi creare il numero di partizioni> = numero di consumatori. Una partizione può essere consumata solo al massimo da un processo consumatore. Ma un consumatore può consumare più di una partizione.