2016-02-16 25 views
11

Come posso ottenere il numero di partizioni per qualsiasi argomento di kafka dal codice. Ho ricercato molti collegamenti ma nessuno sembra funzionare.kafka ottiene il conteggio delle partizioni per un argomento

Citando alcuni:

http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-api

http://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topic

http://qnalist.com/questions/5809219/get-replication-and-partition-count-of-a-topic

che sembrano simili discussioni.

Inoltre ci sono collegamenti simili su SO che non hanno una soluzione funzionante a questo.

+0

Quale versione di Kafka? –

+0

kafka_2.9.2-0.8.2.2 – vish4071

risposta

20

Vai alla directory kafka/bin.

Quindi eseguire questo:

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name

Dovreste vedere che cosa avete bisogno in PartitionCount.

Topic:topic_name  PartitionCount:5  ReplicationFactor:1  Configs: 
     Topic: topic_name  Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001 
     Topic: topic_name  Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001 
     Topic: topic_name  Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001 
     Topic: topic_name  Partition: 3 Leader: 1001 Replicas: 1001 Isr: 1001 
     Topic: topic_name  Partition: 4 Leader: 1001 Replicas: 1001 Isr: 1001 
+0

Oh ... intendevi dal codice, ad esempio da alcune API Java. –

+0

Ho bisogno di ottenere questo dal codice ... – vish4071

+0

Sì ... come da qualche API ... – vish4071

6

Nel API 0,82 Produttore e 0,9 API dei consumatori si può usare qualcosa di simile

Properties configProperties = new Properties(); 
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); 
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); 
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); 

org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); 
producer.partitionsFor("test") 
+0

Questo presuppone che sto usando KafkaConsumer per il mio consumatore, ma sto usando ConsumerConnector per questo. refer: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example – vish4071

2

Ecco come lo faccio:

/** 
    * Retrieves list of all partitions IDs of the given {@code topic}. 
    * 
    * @param topic 
    * @param seedBrokers List of known brokers of a Kafka cluster 
    * @return list of partitions or empty list if none found 
    */ 
    public static List<Integer> getPartitionsForTopic(String topic, List<BrokerInfo> seedBrokers) { 
    List<Integer> partitions = new ArrayList<>(); 
    for (BrokerInfo seed : seedBrokers) { 
     SimpleConsumer consumer = null; 
     try { 
     consumer = new SimpleConsumer(seed.getHost(), seed.getPort(), 20000, 128 * 1024, "partitionLookup"); 
     List<String> topics = Collections.singletonList(topic); 
     TopicMetadataRequest req = new TopicMetadataRequest(topics); 
     kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); 

     // find our partition's metadata 
     List<TopicMetadata> metaData = resp.topicsMetadata(); 
     for (TopicMetadata item : metaData) { 
      for (PartitionMetadata part : item.partitionsMetadata()) { 
      partitions.add(part.partitionId()); 
      } 
     } 
     break; // leave on first successful broker (every broker has this info) 
     } catch (Exception e) { 
     // try all available brokers, so just report error and go to next one 
     LOG.error("Error communicating with broker [" + seed + "] to find list of partitions for [" + topic + "]. Reason: " + e); 
     } finally { 
     if (consumer != null) 
      consumer.close(); 
     } 
    } 
    return partitions; 
    } 

Si noti che ho solo bisogno di tirare fuori partizione ID, ma puoi anche recuperare altri metadati della partizione, ad esempio leader, isr, replicas, ...
E BrokerInfo è solo un semplice POJO con campi host e port.

0

@ La risposta di Sunil-patil non ha risposto al numero di conteggi. Dovete ottenere la dimensione della lista

producer.partitionsFor ("test"). Size()

@ vish4071 nessun punto cozzare Sunil, non ha menzionato che si sta utilizzando ConsumerConnector nella domanda.

2

Quindi il seguente approccio funziona per kafka 0.10 e non utilizza API di produttori o consumer. Usa alcune classi dalla scala API in kafka come ZkConnection e ZkUtils.

ZkConnection zkConnection = new ZkConnection(zkConnect); 
    ZkUtils zkUtils = new ZkUtils(zkClient,zkConnection,false); 
    System.out.println(JavaConversions.mapAsJavaMap(zkUtils.getPartitionAssignmentForTopics(
     JavaConversions.asScalaBuffer(topicList))).get("bidlogs_kafka10").size()); 
0

Ho avuto lo stesso problema, in cui avevo bisogno di ottenere le partizioni per un argomento.

Con l'aiuto della risposta here sono riuscito a ottenere le informazioni da Zookeeper.

Ecco il mio codice a Scala (ma potrebbero essere facilmente tradotta in Java)

import org.apache.zookeeper.ZooKeeper 

def extractPartitionNumberForTopic(topicName: String, zookeeperQurom: String): Int = { 
    val zk = new ZooKeeper(zookeeperQurom, 10000, null); 
    val zkNodeName = s"/brokers/topics/$topicName/partitions" 
    val numPartitions = zk.getChildren(zkNodeName, false).size 
    zk.close() 
    numPartitions 
} 

Usando questo approccio mi ha permesso di accedere alle informazioni su argomenti Kafka, nonché altre informazioni broker Kafka ...

Da Zookeeper si potrebbe verificare il numero di partizioni per un argomento si naviga a /brokers/topics/MY_TOPIC_NAME/partitions

Utilizzando zookeeper-client.sh per la connessione al Zookeeper:

[zk: ZkServer:2181(CONNECTED) 5] ls /brokers/topics/MY_TOPIC_NAME/partitions 
[0, 1, 2] 

Questo ci mostra che ci sono 3 partizioni per l'argomento MY_TOPIC_NAME

0

È possibile esplorare lo kafka.utils.ZkUtils che ha molti metodi volti ad aiutare a estrarre i metadati a sul gruppo. Le risposte qui sono belle quindi sono semplicemente aggiungendo per il bene della diversità:

import kafka.utils.ZkUtils 
import org.I0Itec.zkclient.ZkClient 

def getTopicPartitionCount(zookeeperQuorum: String, topic: String): Int = { 
    val client = new ZkClient(zookeeperQuorum) 
    val partitionCount = ZkUtils.getAllPartitions(client) 
    .count(topicPartitionPair => topicPartitionPair.topic == topic) 

    client.close 
    partitionCount 
} 
0

sotto cmd shell può stampare il numero di partizioni. si dovrebbe essere nella directory kafka bin prima di eseguire il cmd

sh kafka-topics.sh --describe --zookeeper localhost: 2181 --topic Nome argomento | awk '{print $ 2}' | uniq -c | awk 'NR == 2 {print "conte di partizioni =" $ 1}'

si prega di modificare il nome del tema base alle vostre necessità

si può convalidare ulteriormente questo usando se condizione così sh kafka-topics.sh --describe --zookeeper localhost: 2181 --topic TopicName | awk '{print $ 2}' | uniq -c | awk 'NR == 2 {if ($ 1 == "16") stampa "partizioni valide"}'

il cmd sopra stampa partizioni valide se il conteggio è 16. puoi cambiare il conteggio in base al tuo fabbisogno.

0
cluster.availablePartitionsForTopic(topicName).size() 
Problemi correlati