Nel client Java (http://kafka.apache.org/documentation.html#highlevelconsumerapi), commitOffsets sul blocco consumer di alto livello fino a quando gli offset non vengono eseguiti correttamente, oppure è fire-and-forget?Does commitOffsets sul blocco consumer di alto livello?
10
A
risposta
6
Does commitOffsets sul blocco consumer di alto livello fino a quando gli offset non vengono eseguiti correttamente?
Sembra commitOffsets()
loop attraverso ogni consumatore e chiede updatePersistentPath
se il suo offset è cambiato, e se è così scrive dati via zkClient.writeData(path, getBytes(data))
. Sembra che commitOffsets()
compaia nel blocco fino a quando non vengono eseguiti tutti gli offset.
Ecco il codice sorgente per commitOffsets()
(ref):
public void commitOffsets() {
if (zkClient == null) {
logger.error("zk client is null. Cannot commit offsets");
return;
}
for (Entry<String, Pool<Partition, PartitionTopicInfo>> e : topicRegistry.entrySet()) {
ZkGroupTopicDirs topicDirs = new ZkGroupTopicDirs(config.getGroupId(), e.getKey());
for (PartitionTopicInfo info : e.getValue().values()) {
final long lastChanged = info.getConsumedOffsetChanged().get();
if (lastChanged == 0) {
logger.trace("consume offset not changed");
continue;
}
final long newOffset = info.getConsumedOffset();
//path: /consumers/<group>/offsets/<topic>/<brokerid-partition>
final String path = topicDirs.consumerOffsetDir + "/" + info.partition.getName();
try {
ZkUtils.updatePersistentPath(zkClient, path, "" + newOffset);
} catch (Throwable t) {
logger.warn("exception during commitOffsets, path=" + path + ",offset=" + newOffset, t);
} finally {
info.resetComsumedOffsetChanged(lastChanged);
if (logger.isDebugEnabled()) {
logger.debug("Committed [" + path + "] for topic " + info);
}
}
}
}
}
e per updatePersistentPath(...)
(ref):
public static void updatePersistentPath(ZkClient zkClient, String path, String data) {
try {
zkClient.writeData(path, getBytes(data));
} catch (ZkNoNodeException e) {
createParentPath(zkClient, path);
try {
zkClient.createPersistent(path, getBytes(data));
} catch (ZkNodeExistsException e2) {
zkClient.writeData(path, getBytes(data));
}
}
}
Problemi correlati
- 1. Apache Kafka Consumer group e Simple Consumer
- 2. Libreria grafica di alto livello
- 3. Does Collections.unmodifiableList (elenco) richiede un blocco?
- 4. C# è un linguaggio di alto livello?
- 5. di alto livello documentazione relativa alla classe
- 6. Questo approccio lockless consumer-consumer Python è thread-safe?
- 7. Hang di ActiveMQ Consumer
- 8. Linguaggio di programmazione di alto livello per composizione musicale
- 9. Astrazioni di multithreading/concorrenza di alto livello per .NET
- 10. Spiegazione di alto livello della classe di similarità per Lucene?
- 11. Linguaggio di sistema di alto livello che compila in c?
- 12. Blocco livello di classe Java rispetto al blocco del livello oggetto
- 13. libreria ruby di alto livello per la crittografia
- 14. L'interfaccia è il livello più alto di astrazione?
- 15. Rails: errore troppo alto nel livello di stack
- 16. CSS per selezionare il livello più alto di un
- 17. Does DB :: raw influisce quando viene caricato sul server?
- 18. Qual è la differenza tra un'API Java di alto livello e di basso livello?
- 19. kafka consumer 0.9 retrocompatibile?
- 20. BlockingCollection consumer multiplo
- 21. C# Producer/Consumer pattern
- 22. Qualsiasi implementazione di coda libera con blocco singolo produttore single-consumer in C?
- 23. RabbitMQ consumer on demand?
- 24. HDF5 per Python: alto livello vs interfacce di basso livello. h5py
- 25. Devo fare il caching sul livello DAO o sul livello di servizio nell'app web Spring MVC?
- 26. come mantenere gtk.window sempre al livello più alto
- 27. blocco di orientamento in Android a livello di programmazione
- 28. Does thread.yield() perde il blocco sull'oggetto se chiamato all'interno di un metodo sincronizzato?
- 29. C# perché non è possibile sovrascrivere la proprietà statica? (Come possono le classi di alto livello chiamare il metodo della classe base con i dati di alto livello)
- 30. Problemi di Kafka Avro Consumer with Decoder