2014-11-18 11 views
5

Nella ricerca di come creare un argomento Kafka tramite l'API, ho trovato questo esempio a Scala:Come creare Kafka ZKStringSerializer in Java?

import kafka.admin.AdminUtils 
import kafka.utils.ZKStringSerializer 
import org.I0Itec.zkclient.ZkClient 

// Create a ZooKeeper client 
val sessionTimeoutMs = 10000 
val connectionTimeoutMs = 10000 
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, 
          connectionTimeoutMs, ZKStringSerializer) 

// Create a topic with 8 partitions and a replication factor of 3 
val topicName = "myTopic" 
val numPartitions = 8 
val replicationFactor = 3 
val topicConfig = new Properties 
AdminUtils.createTopic(zkClient, topicName, 
         numPartitions, replicationFactor, topicConfig) 

Fonte: https://stackoverflow.com/a/23360100/871012

L'ultimo arg ZKStringSerializer è apparentemente un oggetto Scala. Non mi è chiaro come far funzionare questo esempio in Java.

Questo post How to create a scala object in clojure chiede la stessa domanda in Clojure e la risposta è stata:

ZKStringSerializer$/MODULE$ 

che in Java sarebbe (credo) si traducono in:

ZKStringSerializer$.MODULE$ 

Ma quando provo che (o qualsiasi numero di altre variazioni) nessuno di loro viene compilato.
L'errore di compilazione è:

KafkaTopicCreator.java:[16,18] cannot find symbol 
symbol: variable ZKStringSerializer$ 
location: class org.sample.KafkaTopicCreator 

Sto usando kafka_2.9.2-0.8.1.1 e Java 8.

risposta

17

per Java provare la seguente,

Prima di importazione di seguito dichiarazione

import kafka.utils.ZKStringSerializer$; 

Creare oggetto per ZkClient nel modo seguente,

String zkHosts = "127.0.0.1:2181"; //If more than one zookeeper then "127.0.0.1:2181,127.0.0.2:2181" 
ZkClient zkClient = new ZkClient(zkHosts, 10000, 10000, ZKStringSerializer$.MODULE$); 
AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties()); 

Il codice di cui sopra non funziona per Kafka> 0.9 in quanto l'API è stato cambiato, Utilizza il codice qui sotto per Kafka> 0,9

import java.util.Properties; 
import kafka.admin.AdminUtils; 
import kafka.utils.ZKStringSerializer$; 
import kafka.utils.ZkUtils; 
import org.I0Itec.zkclient.ZkClient; 
import org.I0Itec.zkclient.ZkConnection; 

public class KafkaTopicCreationInJava 
{ 
    public static void main(String[] args) throws Exception { 
     ZkClient zkClient = null; 
     ZkUtils zkUtils = null; 
     try { 
      String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181"; 
      int sessionTimeOutInMs = 15 * 1000; // 15 secs 
      int connectionTimeOutInMs = 10 * 1000; // 10 secs 

      zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); 
      zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); 

      String topicName = "testTopic"; 
      int noOfPartitions = 2; 
      int noOfReplication = 3; 
      Properties topicConfiguration = new Properties(); 

      AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration); 

     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } finally { 
      if (zkClient != null) { 
       zkClient.close(); 
      } 
     } 
    } 
}