2014-05-11 27 views
7

Scrivo Kafka Consumer per applicazioni distribuite ad alta velocità ad alto volume. Ho solo un argomento, ma i messaggi in arrivo sono molto alti. Avere più partizioni che servono più consumatori sarebbe appropriato per questo caso d'uso. Il modo migliore per consumare è avere più lettori di streaming. Come da documentazione o campioni disponibili, il numero di KafkaStreams che ConsumerConnector emette si basa sul numero di argomenti. Ti chiedi come ottenere più di un lettore KafkaStream [basato sulla partizione], in modo che io possa estrapolare un thread per flusso o Leggere dallo stesso KafkaStream in più thread farebbe la lettura simultanea da più partizioni?Apache Kafka - KafkaStream su argomento/partizione

Qualsiasi approfondimento è molto apprezzato.

+0

Utilizzando SimpleConsumer non è un'opzione? –

risposta

14

piacerebbe condividere quello che ho trovato dalla mailing list:

Il numero che si passa nei controlli mappa argomento quanti sono i flussi di un argomento è diviso in. Nel tuo caso, se passi in 1, tutti i dati delle 10 partizioni verranno inseriti in 1 stream. Se si passa in 2, ciascuno dei 2 flussi riceverà i dati da 5 partizioni. Se passi 11, 10 riceveranno i dati da 1 partizione e 1 stream non otterrà nulla.

In genere, è necessario iterare ogni flusso nella propria thread. Questo perché ogni stream può bloccare per sempre se non ci sono nuovi eventi.

frammento

Esempio:

topicCount.put(msgTopic, new Integer(partitionCount)); 
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = connector.createMessageStreams(topicCount); 
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(msgTopic); 

for (final KafkaStream stream : streams) { 
    ReadTask task = new ReadTask(stream, msgTopic); 
    task.addObserver(this.msgObserver); 
    tasks.add(task); executor.submit(task); 
} 

Riferimento: http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%[email protected].com%3E

+0

snippet di esempio topicCount.put (msgTopic, new Integer (partitionCount)); Mappa >> consumerStreams = connector.createMessageStreams (topicCount); Elenco > flussi = consumerStreams.get (msgTopic); per (flusso KafkaStream finale: flussi) { Attività ReadTask = new ReadTask (stream, msgTopic); task.addObserver (this.msgObserver); tasks.add (attività); executor.submit (task); } –

3

Il metodo consigliato per farlo è quello di avere un pool di thread in modo da Java in grado di gestire l'organizzazione per voi e per ogni flusso del metodo createMessageStreamsByFilter dà si consumano in un Runnable. Per esempio:

int NUMBER_OF_PARTITIONS = 6; 
Properties consumerConfig = new Properties(); 
consumerConfig.put("zk.connect", "zookeeper.mydomain.com:2181"); 
consumerConfig.put("backoff.increment.ms", "100"); 
consumerConfig.put("autooffset.reset", "largest"); 
consumerConfig.put("groupid", "java-consumer-example"); 
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig)); 

TopicFilter sourceTopicFilter = new Whitelist("mytopic|myothertopic"); 
List<KafkaStream<Message>> streams = consumer.createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_PARTITIONS); 

ExecutorService executor = Executors.newFixedThreadPool(streams.size()); 
for(final KafkaStream<Message> stream: streams){ 
    executor.submit(new Runnable() { 
     public void run() { 
      for (MessageAndMetadata<Message> msgAndMetadata: stream) { 
       ByteBuffer buffer = msgAndMetadata.message().payload(); 
       byte [] bytes = new byte[buffer.remaining()]; 
       buffer.get(bytes); 
       //Do something with the bytes you just got off Kafka. 
      } 
     } 
    }); 
} 

In questo esempio ho chiesto per 6 filetti fondamentalmente perché so che ho 3 partizioni per ogni argomento e ho elencato due temi nel mio whitelist. Una volta che abbiamo gli handle dei flussi in entrata, possiamo scorrere il loro contenuto, che sono oggetti MessageAndMetadata. I metadati sono in realtà solo il nome dell'argomento e l'offset. Come hai scoperto, puoi farlo in un singolo thread se richiedi 1 stream invece di, nel mio esempio 6, ma se richiedi l'elaborazione parallela, il modo migliore è avviare un executor con un thread per ogni flusso restituito.

+0

Cosa succederebbe se lo facessi? kafkaConsumerConfig = new ConsumerConfig (...); consumerConnector = Consumer.createJavaConsumerConnector (kafkaConsumerConfig); topicCountMap.put ("mytopic", 1); consumerMap.get ("MyTopic") get (0).; Verifica che ci sia get (0) in quella lista di stream di kafka, quindi ricevo solo 1 stream. Cosa succede se chiamo Consumer.createJavaConsumerConnector 10 volte? – stewenson

+0

Condivideranno tutti la stessa configurazione e ognuno leggerà tutte le partizioni quindi suppongo che otterresti 10 consumatori, il che proverebbe a salvare il loro stato nello stesso nodo ZK in modo da finire con il consumatore 1 es leggendo i primi messaggi 1K, quindi l'utente 2 leggendo gli stessi messaggi 1K ma potenzialmente il consumatore 1 finirebbe di leggere il suo aggiornamento batch ZK, leggere un secondo, quindi scrivere la sua posizione su ZK, quindi il nostro secondo per qualche motivo il thread più lento entra e scrive la sua posizione torna a ZK e il primo utente rielabora il secondo batch. Fondamentalmente i conflitti a bizzeffe. – feldoh

0
/** 
* @param source : source kStream to sink output-topic 
*/ 
private static void pipe(KStream<String, String> source) { 
    source.to(Serdes.String(), Serdes.String(), new StreamPartitioner<String, String>() { 

     @Override 
     public Integer partition(String arg0, String arg1, int arg2) { 
      return 0; 
     } 
    }, "output-topic"); 
} 

sopra il codice scriverà record alla partizione 1 di nome dell'argomento "output-topic"

Problemi correlati