Il metodo consigliato per farlo è quello di avere un pool di thread in modo da Java in grado di gestire l'organizzazione per voi e per ogni flusso del metodo createMessageStreamsByFilter dà si consumano in un Runnable. Per esempio:
int NUMBER_OF_PARTITIONS = 6;
Properties consumerConfig = new Properties();
consumerConfig.put("zk.connect", "zookeeper.mydomain.com:2181");
consumerConfig.put("backoff.increment.ms", "100");
consumerConfig.put("autooffset.reset", "largest");
consumerConfig.put("groupid", "java-consumer-example");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig));
TopicFilter sourceTopicFilter = new Whitelist("mytopic|myothertopic");
List<KafkaStream<Message>> streams = consumer.createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_PARTITIONS);
ExecutorService executor = Executors.newFixedThreadPool(streams.size());
for(final KafkaStream<Message> stream: streams){
executor.submit(new Runnable() {
public void run() {
for (MessageAndMetadata<Message> msgAndMetadata: stream) {
ByteBuffer buffer = msgAndMetadata.message().payload();
byte [] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
//Do something with the bytes you just got off Kafka.
}
}
});
}
In questo esempio ho chiesto per 6 filetti fondamentalmente perché so che ho 3 partizioni per ogni argomento e ho elencato due temi nel mio whitelist. Una volta che abbiamo gli handle dei flussi in entrata, possiamo scorrere il loro contenuto, che sono oggetti MessageAndMetadata. I metadati sono in realtà solo il nome dell'argomento e l'offset. Come hai scoperto, puoi farlo in un singolo thread se richiedi 1 stream invece di, nel mio esempio 6, ma se richiedi l'elaborazione parallela, il modo migliore è avviare un executor con un thread per ogni flusso restituito.
Utilizzando SimpleConsumer non è un'opzione? –