2013-09-26 21 views
5

Utilizzo la versione beta di Kafka 0.8 e sto solo cercando di mandare oggetti diversi, serializzarli con il mio codificatore e inviarli a una configurazione di broker esistente. Per ora sto cercando di far funzionare DefaultEncoder.L'encoder predefinito Apache Kafka non funziona

Ho il broker e tutto il setup e il funzionamento per StringEncoder, ma non sono in grado di ottenere nessun altro tipo di dati, compreso solo il byte puro [], per essere inviato e ricevuto dal broker.

mio codice del produttore è:

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

import java.util.Date; 
import java.util.Properties; 
import java.util.Random; 


public class ProducerTest { 
    public static void main(String[] args) { 
     long events = 5; 
     Random rnd = new Random(); 
     rnd.setSeed(new Date().getTime()); 
     Properties props = new Properties(); 
     props.setProperty("metadata.broker.list", "localhost:9093,localhost:9094"); 
     props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); 
     props.setProperty("partitioner.class", "example.producer.SimplePartitioner"); 
     props.setProperty("request.required.acks", "1"); 
     props.setProperty("producer.type", "async"); 
     props.setProperty("batch.num.messages", "4"); 

     ProducerConfig config = new ProducerConfig(props); 
     Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config); 
     for (long nEvents = 0; nEvents < events; nEvents++) { 
      byte[] a = "Hello".getBytes(); 
      byte[] b = "There".getBytes(); 

      KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>("page_visits", a, b); 
      producer.send(data); 
     } 
     try { 
      Thread.sleep(5000); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     producer.close(); 
    } 
} 

ho usato lo stesso SimplePartitioner come nell'esempio dato here, e sostituendo tutte le matrici di byte da archi e cambiando il serializzatore per kafka.serializer.StringEncoder funziona perfettamente .

Per riferimento, SimplePartitioner:

import kafka.producer.Partitioner; 
import kafka.utils.VerifiableProperties; 

public class SimplePartitioner implements Partitioner<String> { 
    public SimplePartitioner (VerifiableProperties props) { 

    } 

    public int partition(String key, int a_numPartitions) { 
     int partition = 0; 
     int offset = key.lastIndexOf('.'); 
     if (offset > 0) { 
      partition = Integer.parseInt(key.substring(offset+1)) % a_numPartitions; 
     } 
     return partition; 
    } 

} 

Che cosa sto facendo di sbagliato?

risposta

6

La risposta è che la classe di partizione SimplePartitioner è applicabile solo per le stringhe. Quando provo a eseguire il Producer in modo asincrono, crea un thread separato che gestisce la codifica e il partizionamento prima di inviarlo al broker. Questo thread attiva un roadblock quando realizza che SimplePartitioner funziona solo per le stringhe, ma poiché si tratta di un thread separato, non vengono generate eccezioni e quindi il thread viene chiuso senza alcuna indicazione di errori.

Se cambiamo il SimplePartitioner di accettare byte [], per esempio:

import kafka.producer.Partitioner; 
import kafka.utils.VerifiableProperties; 

public class SimplePartitioner implements Partitioner<byte[]> { 
    public SimplePartitioner (VerifiableProperties props) { 

    } 

    public int partition(byte[] key, int a_numPartitions) { 
     int partition = 0; 
     return partition; 
    } 

} 

Questo funziona perfettamente ora.

+0

Probabilmente dovresti limitarti al valore predefinito di kafka.producer.DefaultPartitioner per la tua proprietà partitioner.class invece di hardcoding un particolare valore di ritorno per un partizionatore. – gazarsgo

+0

Questo era inteso come test drive. Ma, ecco uno scenario in cui il partizionatore predefinito non funzionerebbe: Supponiamo che si desideri che una particolare sottosequenza dei messaggi venga consumata rigorosamente nell'ordine in cui vengono prodotti. Questo fallirebbe miseramente se si utilizza il partizionatore di default perché il default userebbe semplicemente l'hash della chiave, che è imprevedibile. Invece, se scrivi il tuo partizionatore personalizzato e c'è un modo per rilevare la sottosequenza, possiamo assegnarli alla stessa partizione. Questo esatto caso d'uso si è verificato nella mia domanda. –

Problemi correlati