2015-06-16 14 views
17

Inizio con l'ultimo documento Kafka http://kafka.apache.org/documentation.html. Ma incontro qualche problema quando provo a utilizzare la nuova API Consumer. Ho fatto il lavoro con i seguenti passi:Come utilizzare l'API consumer di Kafka 0.8.2?

1. Aggiungere una nuova dipendenza

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.1</version> 
</dependency> 

2. aggiungere le configurazioni

Map<String, Object> config = new HashMap<String, Object>(); 
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
      "host:9092"); 
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); 
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class.getName()); 
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class.getName()); 
    config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range"); 

3. Utilizzare KafkaConsumer API

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config); 
consumer.subscribe("topic"); 

Tuttavia, quando provo ad interrogare messaggio dal broker, ho ottenuto nulla, ma nulla:

Map<String, ConsumerRecords<String, String>> records = consumer.poll(0); 
if (records != null) 
    process(records); 
else 
    System.err.println("null"); 

E poi io so cosa c'è di sbagliato con il consumatore dopo ho controllato il codice sorgente:

@Override 
public Map<String, ConsumerRecords<K,V>> poll(long timeout) { 
    // TODO Auto-generated method stub 
    return null; 
} 

A peggiorare le cose, non riesco a trovare altre informazioni utili sull'API 0.8.2, dal momento che tutti gli usi su Kafka non sono compatibili con l'ultima versione. Qualcuno potrebbe aiutarmi? Molte grazie.

+3

La nuova API di KafkaConsumer sarà disponibile solo in 0.8.3 https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan. Apparentemente ci sono alcune implementazioni nel bagagliaio, anche se non ho la minima idea dello stato. Per il momento sto usando la vecchia implementazione consumer. – habsq

+0

Grazie a @habsq. Quindi tra tutte le vecchie API, quale versione è la scelta migliore quando si utilizza Kafka 0.8.x? – Yohn

+0

Sto usando 0.8.1.1, non conosco la scelta "migliore". – habsq

risposta

1

Sto anche cercando di scrivere un Consumer su Kafka 0.8.2.1 per leggere i messaggi prodotti dal nuovo Producer.

Finora quello che ho ottenuto è che l'API di Producer è pronta e utilizzabile, mentre da parte dei consumatori dobbiamo aspettare 0.8.3, come notato da @habsq e hai già scoperto che c'è del codice incluso per Consumatore, ma non è ancora funzionante.

Quindi il client da utilizzare (l'attuale API client) è quello trovato nel progetto "core" della versione corrente di Kafka, vale a dire 0.8.2.1 (meglio non eseguire il downgrade del client a qualsiasi altra versione).

Quindi per ora abbiamo bisogno di importare due vasi: uno per i "nuovi" client java e uno per il progetto principale, a seconda anche della versione di scala che si sta utilizzando (io uso 2.11).

Nel mio caso io uso graddle a gestire le dipendenze quindi ho solo bisogno di importare

dependencies { 
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.8.2.1' 
    compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.8.2.1' 
} 

Quando si aggiornano le dipendenze otterrà tutte le librerie necessarie.

Se si utilizza una versione di Scala diversa, è sufficiente modificare la versione; in ogni caso è possibile trovare tutte le diverse versioni o il pom pom su central central: http://search.maven.org/#search|ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A%220.8.2.1%22

Se si utilizzano tali implementazioni Consumer, tutti gli esempi attuali dovrebbero funzionare come al solito.

PS ref: Kafka-utenti ml filo http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2

+0

Inserisco esplicitamente le due dipendenze, poiché in futuro avremo solo bisogno del primo sul jar "clients" e di eliminare quello su "core" kafka. In realtà ora il client è anche una dipendenza dal core, quindi chi vuole essere conciso può semplicemente usare la dipendenza dal core kafka. – RobMcZag

0

Sì Posso confermare che 0.8.2.1 rilascio ha avuto problemi nel consumo di messaggi. Ora facendo un semplice utente con Java/Groovy e rilascio 0.10.1.0, tutto funziona perfettamente.

Non è più necessario impostare PARTITION_ASSIGNMENT_STRATEGY.

Problemi correlati