2016-05-22 19 views
5

Sono nuovo di Kafka. Ho creato un produttore java sul mio computer locale e ho impostato un broker Kafka su un'altra macchina, ad esempio M2, sulla rete (posso eseguire il ping, SSH, connettermi a questa macchina). Dal lato Producer nella console Eclipse ottengo "Messaggio inviato". Ma quando controllo la console consumer sulla macchina M2 non riesco a vedere quei messaggi.Kafka: nessun messaggio visualizzato sul consumatore della console dopo il messaggio inviato da Java Producer

mio java codice del produttore è:

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 


import java.util.HashMap; 
import java.util.Map; 

public class KafkaMessageProducer { 

    /** 
    * @param args 
    */ 
    public static void main(String[] args) { 

     KafkaMessageProducer reportObj = new KafkaMessageProducer(); 
     reportObj.send(); 

    } 

    public void send(){ 

     Map<String, Object> config = new HashMap<String, Object>(); 
     config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "135.113.133.60:9092"); 
     config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

     KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config); 
     int maxMessages = 5; 
     int count = 0; 
     while(count < maxMessages){ 
      producer.send(new ProducerRecord<String, String>("test", "msg", "message --- #"+count++)); 
      System.out.println("Message send.."+count); 
     } 
     producer.close(); 
    } 

} 

Potete per favore fatemelo sapere dove sto andando male? Posso inviare messaggi localmente sulla macchina M2 dal produttore della console. Nota: anche quando cambio l'indirizzo IP con il nome host completo di Kafka Broker ha ancora lo stesso problema.

Aggiornamento: Penso anche che il produttore sia in grado di connettersi al broker Kafka e inviare i messaggi, ma il broker Kafka non trasmette questi messaggi al consumatore. Se cambio l'indirizzo IP o la porta in Zookeeper (che è in esecuzione sullo stesso nodo di Kafka Broker) e guardo il log di Zookeeper, ottiene il ping Producer e quindi rifiuta la sessione.

Update2: ho creato un jar Producer e ho eseguito questo jar sulla macchina M2 e ha funzionato. Quindi sembra che ci sia qualcosa di sbagliato nel modo in cui Producer tenta di connettersi al broker Kafka. Non sono ancora sicuro di quale sia il problema.

+0

L'utente della console era attivo e in esecuzione prima che il produttore inviasse il messaggio? Hai provato a leggere dall'inizio dell'argomento? – yuyang

+0

Sì. Il consumatore della console era attivo e funzionante. L'ho provato anche dopo che il produttore ha inviato il messaggio. Sto leggendo il consumatore della console del modulo dall'inizio del tema. – user2441441

+0

Non correlato: ho trovato che l'altra domanda non vale così tanti downvotes. Quindi sto dando un risarcimento qui ;-) – GhostCat

risposta

3

Ho finalmente trovato la risposta e sto postando qui nel caso in cui qualcun altro abbia lo stesso problema. Usa il broker Kafka impostando advertised.hostname quando stai provando a connetterti da remoto. Questo ha funzionato per me.

+0

si prega di elaborare un po 'di più, puoi per favore fornire l'intera linea di configurazione che hai aggiunto/modificato. (nel modo in cui hai descritto la domanda, descrivi anche la tua risposta). Aspettando la tua risposta e grazie in anticipo. –

+1

@DynamicRemo Nel \ Kafka \ config \ server.properties è necessario aggiungere questo: 'port = 9092 advertised.host.name = localhost' Si noti che localhost viene utilizzato perché non sto utilizzando alcun m/c remoto. –

1

È possibile provare a utilizzare il codice come segue per leggere le informazioni sui metadati per l'argomento kafka per verificare se il broker ha ricevuto i messaggi. Questo può aiutare nel debug.

SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), 100000, 
     64 * 1024, "your_group_id"); 
List<String> topics = new ArrayList<>(); 
topics.add(topic); 
TopicMetadataRequest req = new TopicMetadataRequest(topics); 

TopicMetadataResponse resp = simpleConsumer.send(req); 
if (resp.topicsMetadata().size() != 1) { 
    throw new RuntimeException("Expected one metadata for topic " 
     + topic + " found " + resp.topicsMetadata().size()); 
} 

TopicMetadata topicMetaData = resp.topicsMetadata().get(0); 
+0

Dove posso eseguire questo? Sulla mia macchina o macchina locale M2? – user2441441

+0

è possibile eseguire questo da qualsiasi host in grado di parlare con il broker kafka. – yuyang

+0

Un'altra domanda: dove menziono sopra i dettagli della Macchina M2, se eseguo questo sulla mia macchina locale ?? – user2441441

2

Proprio come idea per il debug - cercare producer.send(/* record */).get(); Cioè, attendere il risultato dal Future restituito dal metodo send(). Potrebbe essere che ci sia un'eccezione dal lato del produttore e viene semplicemente ignorata in background.

+0

Dopo averlo fatto rimane bloccato. L'unica cosa stampata sulla console è l'avviso: log4j: WARN Non è stato possibile trovare appendici per il logger (org.apache.kafka.clients.producer.ProducerConfig). log4j: WARN Si prega di inizializzare correttamente il sistema log4j. log4j: WARN Vedi http://logging.apache.org/log4j/1.2/faq.html#noconfig per maggiori informazioni. – user2441441

Problemi correlati