Sono un nuovo studente che studia Kafka e mi sono imbattuto in alcuni problemi fondamentali con la comprensione di più consumatori che articoli, documentazioni, ecc. Non sono stati troppo utili finora.Come utilizzo più utenti in Kafka?
Una cosa che ho cercato di fare è scrivere il mio produttore e consumatore Kafka di alto livello e gestirli simultaneamente, pubblicando 100 semplici messaggi su un argomento e chiedendoli al mio consumatore di recuperarli. Sono riuscito a farlo con successo, ma quando provo a introdurre un secondo consumatore a consumare dallo stesso argomento in cui i messaggi sono stati appena pubblicati, non riceve alcun messaggio.
Era a mia conoscenza che per ogni argomento potevi avere consumatori da gruppi separati di consumatori e ognuno di questi gruppi di consumatori avrebbe ricevuto una copia completa dei messaggi prodotti su qualche argomento. È corretto? In caso contrario, quale sarebbe il modo corretto per me di creare più consumatori? Questa è la classe di consumatori che ho scritto finora:
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic, String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", consumerGroup);
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(0);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
}
}
Inoltre, ho notato che in origine stavo testando il consumo al di sopra di un 'test' argomento con una sola partizione. Quando ho aggiunto un altro consumatore a un gruppo di consumatori esistente che diceva "testGroup", questo ha innescato un riequilibrio di Kafka che ha rallentato la latenza del mio consumo di una quantità significativa, nell'ordine dei secondi. Ho pensato che si trattasse di un problema di ribilanciamento poiché avevo un'unica partizione, ma quando ho creato un nuovo argomento "multiplepartitions" con 6 partizioni, sono emersi problemi simili in cui l'aggiunta di più utenti allo stesso gruppo di consumatori ha causato problemi di latenza. Mi sono guardato intorno e la gente mi sta dicendo che dovrei usare un consumatore multi-thread: qualcuno può far luce su questo?
C'è un grande esempio di un consumatore di alto livello [qui] (https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example) per kafka '0.8.1'. – chrsblck
@chrsblck grazie per il collegamento.L'ho già esaminato in precedenza e probabilmente non l'ho capito come avrei potuto - potresti forse spiegare un po 'come quell'esempio fa uso dei thread? Al momento non capisco cosa stiano facendo. –
Un modo è quello di avere lo stesso numero di thread delle partizioni per un determinato argomento. Dall'articolo - Grab una lista di stream 'List> stream = consumerMap.get (topic);' ... Quindi assegna ad ogni thread una partizione 'executor.submit (new ConsumerTest (stream, threadNumber)) '. –
chrsblck