2016-07-08 19 views
5

Ho un singolo nodo, multi (3) setup di Zookeeper/Kafka broker. Sto usando il client Java 0.10 di Kafka.Kafka 0.10 Java Client TimeoutException: batch contenente 1 record (i) scaduti

ho scritto seguendo semplici remoto (su un server diverso da Kafka) Produttore (nel codice ho sostituito il mio indirizzo IP pubblico con MyIP):

Properties config = new Properties(); 
try { 
    config.put(ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); 
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "MYIP:9092, MYIP:9093, MYIP:9094"); 
    config.put(ProducerConfig.ACKS_CONFIG, "all"); 
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); 
    producer = new KafkaProducer<String, byte[]>(config); 
    Schema.Parser parser = new Schema.Parser(); 
    schema = parser.parse(GATEWAY_SCHEMA); 
    recordInjection = GenericAvroCodecs.toBinary(schema); 
    GenericData.Record avroRecord = new GenericData.Record(schema); 
    //Filling in avroRecord (code not here) 
    byte[] bytes = recordInjection.apply(avroRecord); 

    Future<RecordMetadata> future = producer.send(new ProducerRecord<String, byte[]>(datasetId+"", "testKey", bytes)); 
    RecordMetadata data = future.get(); 
} catch (Exception e) { 
    e.printStackTrace(); 
} 

mie proprietà del server per i 3 broker simile a questa (nei 3 file di proprietà del server differenti broker.id è 0, 1, 2 e gli ascoltatori è PLAINTEXT: //: 9092, PLAINTEXT: //: 9093, PLAINTEXT: //: 9094 e host.name è 10.2.0.4, 10.2. 0.5, 10.2.0.6). Questo è il primo file proprietà del server:

broker.id=0 
listeners=PLAINTEXT://:9092 
num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 
log.dirs=/tmp/kafka1-logs 
num.partitions=1 
num.recovery.threads.per.data.dir=1 
log.retention.hours=168 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=300000 
zookeeper.connect=localhost:2181 
zookeeper.connection.timeout.ms=6000 

Quando eseguo il codice, vengo seguente eccezione:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0 
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65) 
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52) 
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) 
    at com.nr.roles.gateway.GatewayManager.addTransaction(GatewayManager.java:212) 
    at com.nr.roles.gateway.gw.service(gw.java:126) 
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) 
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:821) 
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583) 
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1158) 
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511) 
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1090) 
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) 
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:109) 
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:119) 
    at org.eclipse.jetty.server.Server.handle(Server.java:517) 
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:308) 
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:242) 
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:261) 
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95) 
    at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:75) 
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213) 
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147) 
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654) 
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0 

Qualcuno sa che cosa mi manca? Qualsiasi aiuto sarebbe apprezzato. Grazie mille

+0

Ho anche provato a fare lo stesso come sopra, ma con un solo broker (sulla porta 9092). Ho ancora la stessa identica eccezione. Mi sono assicurato che le porte broker e zookeeper sulla macchina remota fossero aperte e posso telnetle dalla macchina Producer. – Armen

risposta

1

Le informazioni sulla porta nella configurazione BOOTSTRAP_SERVERS_CONFIG non sono corrette (MYIP:).

Come indicato in server.properties come "PLAINTEXT: //: 9093, PLAINTEXT: //: 9093, PLAINTEXT: //: 9094".

+0

scusate, ho avuto un errore di battitura qui, in server.properties è "PLAINTEXT: //: 9092, PLAINTEXT: //: 9093, PLAINTEXT: //: 9094". Quindi le porte 'BOOTSTRAP_SERVERS_CONFIG' sono corrette. – Armen

3

Ho riscontrato gli stessi problemi.

È necessario modificare kafka server.properties per specificare l'indirizzo IP. esempio:

PLAINTEXT: // YOUIP: 9093

se non, Kafka utilizzerà hostname, se il produttore non può ottenere il padrone di casa, che non è possibile inviare un messaggio a Kafka, anche se è possibile telnet.

+0

ho la mia proprietà impostata direttamente nel mio server.properties come, ascoltatori = PLAINTEXT: // nome_dominio: 9092, ma otteniamo comunque questa eccezione org.apache.kafka.common.errors.TimeoutException: batch scaduto java.util.concurrent. ExecutionException, durante la connessione da un server esterno, ho aumentato request.timeout.ms \t anche a valori più alti. –

+0

non risponde apparentemente alla domanda dell'OP ... – Paul

0

This risposta condivide alcune intuizioni. È possibile aumentare la configurazione del produttore request.timeout.ms che consentirà al client di accodare i lotti più a lungo prima di scadere.

Si potrebbe anche voler esaminare le configurazioni e linger.ms e trovare l'ottimale che funzioni nel vostro caso.

Problemi correlati