Sto provando a leggere i vecchi messaggi da Kafka con lo streaming di scintille. Tuttavia, sono in grado di recuperare i messaggi solo quando vengono inviati in tempo reale (ad esempio, se compongo nuovi messaggi, mentre il mio programma spark è in esecuzione - quindi ottengo quei messaggi).apache spark streaming - kafka - lettura dei vecchi messaggi
Sto cambiando il mio groupID e consumerID per assicurarmi che Zookeeper non stia semplicemente dando messaggi che conosce il mio programma ha visto prima.
Supponendo che la scintilla veda l'offset in zookeeper come -1, non dovrebbe leggere tutti i vecchi messaggi in coda? Sto semplicemente fraintendendo il modo in cui una coda di kafka può essere usata? Sono molto nuovo a scintilla e kafka, quindi non posso escludere che sto solo fraintendendo qualcosa.
package com.kibblesandbits
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import net.liftweb.json._
object KafkaStreamingTest {
val cfg = new ConfigLoader().load
val zookeeperHost = cfg.zookeeper.host
val zookeeperPort = cfg.zookeeper.port
val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot
implicit val formats = DefaultFormats
def parser(json: String): String = {
return json
}
def main(args : Array[String]) {
val zkQuorum = "test-spark02:9092"
val group = "myGroup99"
val topic = Map("testtopic" -> 1)
val sparkContext = new SparkContext("local[3]", "KafkaConsumer1_New")
val ssc = new StreamingContext(sparkContext, Seconds(3))
val json_stream = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
var gp = json_stream.map(_._2).map(parser)
gp.saveAsTextFiles("/tmp/sparkstreaming/mytest", "json")
ssc.start()
}
Durante l'esecuzione, verrà visualizzato il seguente messaggio. Quindi sono fiducioso che non è solo non vedere i messaggi perché l'offset è impostato.
14/12/05 13:34:08 INFORMAZIONI ConsumerFetcherManager: [ConsumerFetcherManager-1.417.808,045047 millions] fetcher aggiunto per partizioni ArrayBuffer ([[testtopic, 0], initOffset -1 di mediare id: 1 questo, : test-spark02.vpc, porta: 9092], [[testtopic, 1], initOffset -1 all'id broker: 1, host: test-spark02.vpc, porta: 9092], [[testtopic, 2], initOffset -1 al broker id: 1, host: test-spark02.vpc, porta: 9092], [[testtopic, 3], initOffset -1 all'id broker: 1, host: test-spark02.vpc, porta: 9092], [[testtopic, 4], initOffset -1 al broker id: 1, host: test-spark02.vpc, porta: 9092])
Quindi, se vengono inseriti 1000 nuovi messaggi, è possibile visualizzare quei 1000 messaggi salvati nella directory temporanea. Ma non so come leggere i messaggi esistenti, che dovrebbero essere numerati (a questo punto) decine di migliaia.
ancora non ha funzionato per me, c'è un altro modo possibile per farlo? Ho un messaggio 10k che risiede nell'argomento, ma solo in grado di recuperarli quando ottengo un nuovo messaggio nell'argomento. Come ottenere i dati già memorizzati nell'argomento di kafka? –
"auto.offset.reset" -> "il più piccolo" ha funzionato per me. Inoltre, secondo i documenti, https://cwiki.apache.org/confluence/display/KAFKA/FAQ dovrebbe essere "meno recente" se usi la versione 0.9 – Evgenii