Utilizzo la versione beta di Kafka 0.8 e sto solo cercando di mandare oggetti diversi, serializzarli con il mio codificatore e inviarli a una configurazione di broker esistente. Per ora sto cercando di far funzionare DefaultEncoder.L'encoder predefinito Apache Kafka non funziona
Ho il broker e tutto il setup e il funzionamento per StringEncoder, ma non sono in grado di ottenere nessun altro tipo di dati, compreso solo il byte puro [], per essere inviato e ricevuto dal broker.
mio codice del produttore è:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
public class ProducerTest {
public static void main(String[] args) {
long events = 5;
Random rnd = new Random();
rnd.setSeed(new Date().getTime());
Properties props = new Properties();
props.setProperty("metadata.broker.list", "localhost:9093,localhost:9094");
props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
props.setProperty("partitioner.class", "example.producer.SimplePartitioner");
props.setProperty("request.required.acks", "1");
props.setProperty("producer.type", "async");
props.setProperty("batch.num.messages", "4");
ProducerConfig config = new ProducerConfig(props);
Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
byte[] a = "Hello".getBytes();
byte[] b = "There".getBytes();
KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>("page_visits", a, b);
producer.send(data);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.close();
}
}
ho usato lo stesso SimplePartitioner come nell'esempio dato here, e sostituendo tutte le matrici di byte da archi e cambiando il serializzatore per kafka.serializer.StringEncoder funziona perfettamente .
Per riferimento, SimplePartitioner:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner<String> {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(String key, int a_numPartitions) {
int partition = 0;
int offset = key.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(key.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
Che cosa sto facendo di sbagliato?
Probabilmente dovresti limitarti al valore predefinito di kafka.producer.DefaultPartitioner per la tua proprietà partitioner.class invece di hardcoding un particolare valore di ritorno per un partizionatore. – gazarsgo
Questo era inteso come test drive. Ma, ecco uno scenario in cui il partizionatore predefinito non funzionerebbe: Supponiamo che si desideri che una particolare sottosequenza dei messaggi venga consumata rigorosamente nell'ordine in cui vengono prodotti. Questo fallirebbe miseramente se si utilizza il partizionatore di default perché il default userebbe semplicemente l'hash della chiave, che è imprevedibile. Invece, se scrivi il tuo partizionatore personalizzato e c'è un modo per rilevare la sottosequenza, possiamo assegnarli alla stessa partizione. Questo esatto caso d'uso si è verificato nella mia domanda. –