2016-06-20 22 views
15

Desidero verificare se il server kafka è in esecuzione o meno prima di avviare processi di produzione e consumo. E 'in ambiente Windows ed ecco il codice del mio server Kafka in Eclipse ...Come verificare se Kafka Server è in esecuzione?

Properties kafka = new Properties(); 
kafka.setProperty("broker.id", "1"); 
kafka.setProperty("port", "9092"); 
kafka.setProperty("log.dirs", "D://workspace//"); 
kafka.setProperty("zookeeper.connect", "localhost:2181");  
Option<String> option = Option.empty(); 
KafkaConfig config = new KafkaConfig(kafka);   
KafkaServer server = new KafkaServer(config, new CurrentTime(), option); 
server.startup(); 

In questo caso if (server != null) non è sufficiente, perché è sempre vero. Quindi c'è un modo per sapere che il mio server kafka è in esecuzione e pronto per il produttore. È necessario che lo verifichi perché causa la perdita di alcuni pacchetti di dati iniziali.

Grazie.

+0

Che sistema operativo stai usando? – Maroun

+0

Ho aggiornato la mia domanda. – Khan

risposta

12

broker Tutti Kafka deve essere assegnato un broker.id. All'avvio un broker creerà un nodo effimero in Zookeeper con un percorso di /broker/ids/$id. Poiché il nodo è effimero, verrà rimosso non appena il broker si disconnette, ad es. chiudendo.

È possibile visualizzare l'elenco dei nodi mediatore effimere in questo modo:

echo dump | nc localhost 2181 | grep brokers

L'interfaccia client ZooKeeper espone una serie di comandi; dump elenca tutte le sessioni e i nodi effimeri per il cluster.

nota, quanto sopra assume:

  • Si sta eseguendo ZooKeeper sulla porta predefinita (2181) su localhost, e che localhost è il leader per il cluster
  • tuo zookeeper.connect Kafka config non lo fa specificare un env chroot per il vostro Kafka grappolo cioè è solo host:port e non host:port/path
0

da Java, si può facilmente effettuare una chiamata a questo comando:

[[email protected] hsperfdata_kafka]$ /usr/bin/kafka status 
Kafka is running with PID=17850. 

Avvolgere in un try-catch. Il blocco delle eccezioni viene eseguito se hai riscontrato problemi con kafka. Usa un 'finally' per ripulire qualsiasi cosa sul tuo programma java.

cercherò di condividere un esempio di codice

+0

puoi condividere un campione qui –

1

risposta di Paolo è molto buona ed è in realtà come Kafka & Zk lavorare insieme da un punto di vista mediatore.

direi che un'altra opzione facile da controllare se un server di Kafka è in esecuzione è quello di creare un semplice KafkaConsumer punta al cluste e tentare qualche azione, per esempio, listTopics().Se il server kafka non è in esecuzione, riceverai un TimeoutException e potrai utilizzare una frase try-catch.

def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = { 
    val props = new Properties() 
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString) 
    props.put("group.id", kafkaParams.get("group.id").get.toString) 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    val simpleConsumer = new KafkaConsumer[String, String](props) 
    simpleConsumer.listTopics() 
    } 
1

La buona opzione è quella di utilizzare AdminClient come sotto prima di iniziare a produrre o consumare i messaggi

private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;   
try (AdminClient client = AdminClient.create(properties)) { 
      client.listTopics(new ListTopicsOptions().timeoutMs(ADMIN_CLIENT_TIMEOUT_MS)).listings().get(); 
     } catch (ExecutionException ex) { 
      LOG.error("Kafka is not available, timed out after {} ms", ADMIN_CLIENT_TIMEOUT_MS); 
      return; 
     } 
Problemi correlati