2015-11-17 23 views
9

Ho alcuni test JUnit sul codice che utilizza un argomento kafka. Gli argomenti simulati su kafka che ho provato non funzionano e gli esempi trovati online sono molto vecchi e quindi non funzionano con 0.8.2.1. Come posso creare un argomento di simulazione kafka usando 0.8.2.1?Come posso creare un'istanza di un argomento Mock Kafka per i test di junit?

Per chiarire: sto scegliendo di utilizzare un'istanza reale incorporata dell'argomento per testare con un'istanza reale piuttosto che prendere in giro la mano in mockito. Questo è così che posso testare che i miei codificatori e decodificatori personalizzati funzionano davvero e che non fallisce quando uso una vera istanza di kafka.

risposta

6

https://gist.github.com/asmaier/6465468#file-kafkaproducertest-java

Questo esempio è stato aggiornato per lavorare nella nuova versione 0.8.2.2. Ecco il codice snippit con dipendenze Maven:

pom.xml:

<dependencies> 
<dependency> 
    <groupId>junit</groupId> 
    <artifactId>junit</artifactId> 
    <version>4.12</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <classifier>test</classifier> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.2</version> 
</dependency> 
</dependencies> 

KafkaProducerTest.java:

import java.nio.charset.StandardCharsets; 
import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 
import org.I0Itec.zkclient.ZkClient; 
import org.junit.Test; 
import kafka.admin.TopicCommand; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 
import kafka.producer.KeyedMessage; 
import kafka.producer.Producer; 
import kafka.producer.ProducerConfig; 
import kafka.server.KafkaConfig; 
import kafka.server.KafkaServer; 
import kafka.utils.MockTime; 
import kafka.utils.TestUtils; 
import kafka.utils.TestZKUtils; 
import kafka.utils.Time; 
import kafka.utils.ZKStringSerializer$; 
import kafka.zk.EmbeddedZookeeper; 
import static org.junit.Assert.*; 

/** 
* For online documentation 
* see 
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
* https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/admin/TopicCommand.scala 
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
*/ 
public class KafkaProducerTest { 

    private int brokerId = 0; 
    private String topic = "test"; 

    @Test 
    public void producerTest() throws InterruptedException { 

     // setup Zookeeper 
     String zkConnect = TestZKUtils.zookeeperConnect(); 
     EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect); 
     ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$); 

     // setup Broker 
     int port = TestUtils.choosePort(); 
     Properties props = TestUtils.createBrokerConfig(brokerId, port, true); 

     KafkaConfig config = new KafkaConfig(props); 
     Time mock = new MockTime(); 
     KafkaServer kafkaServer = TestUtils.createServer(config, mock); 

     String [] arguments = new String[]{"--topic", topic, "--partitions", "1","--replication-factor", "1"}; 
     // create topic 
     TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments)); 

     List<KafkaServer> servers = new ArrayList<KafkaServer>(); 
     servers.add(kafkaServer); 
     TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000); 

     // setup producer 
     Properties properties = TestUtils.getProducerConfig("localhost:" + port); 
     ProducerConfig producerConfig = new ProducerConfig(properties); 
     Producer producer = new Producer(producerConfig); 

     // setup simple consumer 
     Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), "group0", "consumer0", -1); 
     ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties)); 

     // send message 
     KeyedMessage<Integer, byte[]> data = new KeyedMessage(topic, "test-message".getBytes(StandardCharsets.UTF_8)); 

     List<KeyedMessage> messages = new ArrayList<KeyedMessage>(); 
     messages.add(data); 

     producer.send(scala.collection.JavaConversions.asScalaBuffer(messages)); 
     producer.close(); 

     // deleting zookeeper information to make sure the consumer starts from the beginning 
     // see https://stackoverflow.com/questions/14935755/how-to-get-data-from-old-offset-point-in-kafka 
     zkClient.delete("/consumers/group0"); 

     // starting consumer 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(topic, 1); 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
     KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); 
     ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 

     if(iterator.hasNext()) { 
      String msg = new String(iterator.next().message(), StandardCharsets.UTF_8); 
      System.out.println(msg); 
      assertEquals("test-message", msg); 
     } else { 
      fail(); 
     } 

     // cleanup 
     consumer.shutdown(); 
     kafkaServer.shutdown(); 
     zkClient.close(); 
     zkServer.shutdown(); 
    } 
} 

Assicuratevi di controllare la vostra dipendenza mvn: albero per eventuali librerie in conflitto. Ho dovuto aggiungere le esclusioni per SLF e log4j:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.2</version> 
    <classifier>test</classifier> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.2</version> 
    <exclusions> 
     <exclusion> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-log4j12</artifactId> 
     </exclusion> 
     <exclusion> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 

Un'altra opzione che sto esaminando sta usando apache curatore: Is it possible to start a zookeeper server instance in process, say for unit tests?

<dependency> 
    <groupId>org.apache.curator</groupId> 
    <artifactId>curator-test</artifactId> 
    <version>2.2.0-incubating</version> 
    <scope>test</scope> 
</dependency> 

TestingServer zkTestServer; 

@Before 
public void startZookeeper() throws Exception { 
    zkTestServer = new TestingServer(2181); 
    cli = CuratorFrameworkFactory.newClient(zkTestServer.getConnectString(), new RetryOneTime(2000)); 
} 

@After 
public void stopZookeeper() throws IOException { 
    cli.close(); 
    zkTestServer.stop(); 
} 
+0

puoi per favore fornire codice funzionante per la versione 0.11.0.2. Sopra il codice non funziona – dhroove

2

Hai provato a deridere gli oggetti di consumo di kafka usando una struttura beffarda come Mockito?

+0

avrei preferito una versione finta di Kafka quindi so i produttori e i consumatori ci stanno lavorando. Ci sono alcuni esempi qui e là online (es: https://ransilberman.wordpress.com/2013/07/19/how-to-unit-test-kafka). Tuttavia, sono per versioni precedenti quindi non funziona più con 0.8.2.1. – Chip

Problemi correlati