5

Sono nuovo di Kafka 0.9 e testando alcune funzionalità ho realizzato uno strano comportamento nel consumer implementato in Java (KafkaConsumer).Il metodo poll() di Kafka Consumer viene bloccato

Il broker Kafka si trova in una macchina esterna Ambari.

Anche se potessi implementare un Producer e iniziare a inviare messaggi al broker esterno, non ho idea del perché, quando il consumatore cerca di leggere gli eventi (sondaggio), rimane bloccato.

So che il produttore sta funzionando bene, dato che posso consumare messaggi attraverso il consumatore della console (che lavora localmente su ambari). Ma quando eseguo il consumer Java, non succede nulla, resta bloccato. Debug il codice che ho potuto vedere che si blocca alla linea poll():

ConsumerRecords<String, String> records = consumer.poll(100); 

Il timeout non fa nulla, tra l'altro. Non importa se inserisci 0, 100 o 1000 ms, il consumatore viene bloccato in questa riga e non esegue il timeout né genera eccezioni.

ho provato tutti i tipi di proprietà alternativi, come advertised.host.name, advertised.listener, ... e così via, con la fortuna zero.

Qualsiasi aiuto sarebbe molto apprezzato. Grazie in anticipo!

+0

Sei in grado di consumare i messaggi in un modo diverso, ad esempio utilizzando 'kafka-console-consumer.sh'? –

+0

Sì, lo sono. Dalla macchina che ospita l'ambari, posso consumare messaggi attraverso l'utente della console –

+0

E che dire della macchina su cui usi il tuo consumatore? Hai provato il consumatore della console lì? –

risposta

1

Motivo potrebbe essere la macchina su cui è in esecuzione il codice utente non è in grado di connettersi a Zookeeper. Prova a eseguire lo stesso codice utente sulla macchina su cui è installato Kafka (l'ho provato e ha funzionato per me). Ho anche risolto il problema menzionando le proprietà sottostanti nel file server.properties: advertised.host.name="ip address which you want to expose" // nel mio caso è l'ip pubblico della macchina ec2, ho kafka e il guardiano dello zoo installati sulla stessa ec2. advertised.port=9092 ConsumerRecords<String, String> records = consumer.poll(100); L'istruzione sopra riportata non significa che il consumatore scadrà dopo 100 ms, è il periodo di polling. Qualunque dato acquisito in 100 ms viene letto nella raccolta dei record.

Problemi correlati