2016-02-29 16 views
5

Attualmente sto usando Kafka 0.9.0.1. Secondo alcune fonti che ho trovato, il modo per impostare le dimensioni dei messaggi è modificare i seguenti valori chiave in server.properties.Come si imposta la dimensione dei messaggi in Kafka?

  • message.max.bytes
  • replica.fetch.max.bytes
  • fetch.message.max.bytes

Il mio file server.properties ha in realtà queste impostazioni.

message.max.bytes=10485760 
replica.fetch.max.bytes=20971520 
fetch.message.max.bytes=10485760 

Altre impostazioni che possono essere rilevanti sono di seguito.

socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 

Tuttavia, quando si tenta di inviare messaggi con carichi utili da 4 a 6 MB, il consumatore non riceve alcun messaggio. Il produttore sembra inviare i messaggi senza alcuna eccezione. Se faccio inviare payload più piccoli (come < 1 MB), il consumatore riceve effettivamente i messaggi.

Qualche idea su cosa sto facendo male in termini di impostazioni di configurazione?

Ecco il codice di esempio per inviare un messaggio.

Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps()); 
File dir = new File("/path/to/dir"); 
for(String s : dir.list()) { 
    File f = new File(dir, s); 
    byte[] data = Files.readAllBytes(f.toPath()); 
    Payload payload = new Payload(data); //a simple pojo to store payload 
    String key = String.valueOf(System.currentTimeMillis()); 
    byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[] 
    producer.send(new ProducerRecord<>("test", key, val)); 
} 
producer.close(); 

Ecco il codice di esempio per ricevere un messaggio.

KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps()); 
consumer.subscribe(Arrays.asList("test")); 
while(true) { 
    ConsumerRecord<String, byte[]> records = consumer.poll(100); 
    for(ConsumerRecord<String, byte[]> record : records) { 
    long offset = record.offset(); 
    String key = record.key(); 
    byte[] val = record.value(); 
    Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object 
    System.out.println(
     System.format("offset=%d, key=%s", offset, key)); 
    } 
} 

Ecco i metodi per popolare i file delle proprietà per il produttore e il consumatore.

public static Properties getProducerProps() { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092"); 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 1); 
    props.put("buffer.memory", 33554432); 
    props.put("compression.type", "snappy"); 
    props.put("max.request.size", 10485760); //need this 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 
    return props; 
} 

public static Properties getConsumerProps() { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092"); 
    props.put("group.id", "test"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("max.partition.fetch.bytes", 10485760); //need this too 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); 
    return props; 
} 

risposta

7

Jane, Non utilizzare fetch.message.max.bytes prima di tutto perché è una proprietà che da consumatore e non va nel file server.properties e secondo perché è per la vecchia versione del consumatore, invece utilizzare max.partition.fetch.bytes quando si crea il consumer come parte delle proprietà che si utilizzano per istanziarlo.

+0

Ho appena provato, ma ottengo lo stesso effetto. I file "grandi" non vengono ricevuti. Mi chiedo se vengano addirittura inviati, perché quando il consumatore inizia a leggere dall'argomento, gli offset sono contigui (ad esempio 1, 2, 3, ecc.). Per me, sembra che il produttore non possa nemmeno inviare i file di grandi dimensioni? –

+0

Si scopre che ho bisogno di impostare sia 'max.request.size' per il produttore sia' max.partition.fetch.bytes' per il consumatore. Mi armerò un po 'con il codice per vedere se 'max.partition.fetch.bytes' è veramente necessario. –

+0

Sì, risulta che ho bisogno di entrambe le impostazioni. Se non imposto 'max.partition.fetch.bytes', ottengo un 'RecordTooLargeException'. –

Problemi correlati