Nel mio caso (testare la lettura dei file in produttore e consumare in consolle, il tutto in locale), ho appena visto questo in produttore di uscita:
offset.storage.file.filename=/tmp/connect.offsets
così ho voluto aprirlo, ma è binario, con alcuni personaggi difficilmente riconoscibili.
L'ho eliminato (rinominarlo funziona anche), quindi posso scrivere nello stesso file e ottenere di nuovo il contenuto del file dal consumatore. È necessario riavviare il produttore della console affinché abbia effetto perché tenta di leggere il file di offset, se non è presente, crearne uno nuovo, in modo che l'offset venga ripristinato.
Se si vuole ripristinarlo senza la cancellazione, è possibile utilizzare:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group-name> --reset-offsets --to-earliest --topic <topic_name>
È possibile controllare tutti i nomi dei gruppi da:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
e controllare i dettagli di ogni gruppo:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group_name> --describe
Nell'ambiente di produzione, questo offset è gestito da zookeeper, quindi più passaggi (e c aution) è necessario. È possibile fare riferimento a questa pagina:
https://metabroadcast.com/blog/resetting-kafka-offsets https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html
Passi:
kafka-topics --list --zookeeper localhost:2181
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1 // -1 for largest, -2 for smallest
set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId} {newOffset}
Come un (cattivo) soluzione, è possibile eliminare il connettore e registrare un nuovo connettore con un nome diverso. Ovviamente, questo ha senso solo quando non devi farlo regolarmente. – pederpansen
[Questo] (https://stackoverflow.com/questions/45670937/kafka-0-11-how-to-reset-offsets) è una bella spiegazione di come modificare gli offset per un gruppo. –