Inizio con l'ultimo documento Kafka http://kafka.apache.org/documentation.html. Ma incontro qualche problema quando provo a utilizzare la nuova API Consumer. Ho fatto il lavoro con i seguenti passi:Come utilizzare l'API consumer di Kafka 0.8.2?
1. Aggiungere una nuova dipendenza
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
2. aggiungere le configurazioni
Map<String, Object> config = new HashMap<String, Object>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"host:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
3. Utilizzare KafkaConsumer API
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");
Tuttavia, quando provo ad interrogare messaggio dal broker, ho ottenuto nulla, ma nulla:
Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
process(records);
else
System.err.println("null");
E poi io so cosa c'è di sbagliato con il consumatore dopo ho controllato il codice sorgente:
@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
// TODO Auto-generated method stub
return null;
}
A peggiorare le cose, non riesco a trovare altre informazioni utili sull'API 0.8.2, dal momento che tutti gli usi su Kafka non sono compatibili con l'ultima versione. Qualcuno potrebbe aiutarmi? Molte grazie.
La nuova API di KafkaConsumer sarà disponibile solo in 0.8.3 https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan. Apparentemente ci sono alcune implementazioni nel bagagliaio, anche se non ho la minima idea dello stato. Per il momento sto usando la vecchia implementazione consumer. – habsq
Grazie a @habsq. Quindi tra tutte le vecchie API, quale versione è la scelta migliore quando si utilizza Kafka 0.8.x? – Yohn
Sto usando 0.8.1.1, non conosco la scelta "migliore". – habsq