2016-06-09 31 views
6

Come posso ottenere, impostare o resettare l'offset di un connettore/task/sink Kafka Connect?Offset di Kafka Connect. Preparatevi?

Posso utilizzare lo strumento /usr/bin/kafka-consumer-groups che esegue kafka.admin.ConsumerGroupCommand per visualizzare gli offset per tutti i miei normali gruppi di consumatori Kafka. Tuttavia, le attività ei gruppi di Kafka Connect non vengono visualizzati con questo strumento.

Analogamente, posso utilizzare la shell di zookeeper per connettersi a Zookeeper e posso vedere voci di zookeeper per gruppi di utenti regolari di Kafka, ma non per i sink di Kafka Connect.

+0

Come un (cattivo) soluzione, è possibile eliminare il connettore e registrare un nuovo connettore con un nome diverso. Ovviamente, questo ha senso solo quando non devi farlo regolarmente. – pederpansen

+0

[Questo] (https://stackoverflow.com/questions/45670937/kafka-0-11-how-to-reset-offsets) è una bella spiegazione di come modificare gli offset per un gruppo. –

risposta

7

A partire da 0.10.0.0, Connect non fornisce un'API per la gestione degli offset. È qualcosa che vogliamo migliorare in futuro, ma non ancora presente. Lo ConsumerGroupCommand sarebbe lo strumento giusto per gestire gli offset dei connettori sink. Nota che gli offset del connettore sorgente sono memorizzati in un argomento di offset speciale per Connect (non sono come i normali offset di Kafka dal momento che sono definiti dal sistema sorgente, vedi offset.storage.topic nello worker configuration docs) e poiché i connettori sink utilizzano il nuovo consumatore, hanno vinto " t memorizzare i loro offset in Zookeeper: tutti i client moderni utilizzano lo storage offset nativo basato su Kafka. ConsumerGroupCommand può funzionare con questi offset, è sufficiente passare l'opzione --new-consumer).

+1

Qualsiasi ETA o piani per questo per favore? Può vedere https://issues.apache.org/jira/browse/KAFKA-4107 anche senza progresso o dettagli. – SemanticBeeng

2

Non è possibile impostare offset, ma è possibile utilizzare lo strumento kafka-consumer-groups.sh per "scorrere" il feed in avanti.

Il gruppo di consumatori del connettore ha un nome di connect-*CONNECTOR NAME*, ma è possibile fare doppio controllare: unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --list

Visualizzazione corrente di offset: unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --group connect-*CONNECTOR NAME* --describe

Per spostare l'offset in avanti: unset JMX_PORT; ./bin/kafka-console-consumer.sh --bootstrap-server *KAFKA HOSTS* --topic *TOPIC* --max-messages 10000 --consumer-property group.id=connect-*CONNECTOR NAME* > /dev/null

Suppongo che sia possibile spostare l'offset anche cancellando prima il gruppo di consumatori, usando il flag --delete.

Non dimenticare di mettere in pausa e riattivare il connettore tramite l'API REST di Kafka Connect.

0

Nel mio caso (testare la lettura dei file in produttore e consumare in consolle, il tutto in locale), ho appena visto questo in produttore di uscita:

offset.storage.file.filename=/tmp/connect.offsets 

così ho voluto aprirlo, ma è binario, con alcuni personaggi difficilmente riconoscibili.

L'ho eliminato (rinominarlo funziona anche), quindi posso scrivere nello stesso file e ottenere di nuovo il contenuto del file dal consumatore. È necessario riavviare il produttore della console affinché abbia effetto perché tenta di leggere il file di offset, se non è presente, crearne uno nuovo, in modo che l'offset venga ripristinato.

Se si vuole ripristinarlo senza la cancellazione, è possibile utilizzare:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group-name> --reset-offsets --to-earliest --topic <topic_name> 

È possibile controllare tutti i nomi dei gruppi da:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list 

e controllare i dettagli di ogni gruppo:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group_name> --describe 

Nell'ambiente di produzione, questo offset è gestito da zookeeper, quindi più passaggi (e c aution) è necessario. È possibile fare riferimento a questa pagina:

https://metabroadcast.com/blog/resetting-kafka-offsets https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html

Passi:

kafka-topics --list --zookeeper localhost:2181 
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1 // -1 for largest, -2 for smallest 

set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId} {newOffset} 
Problemi correlati