5

Sto provando a leggere i vecchi messaggi da Kafka con lo streaming di scintille. Tuttavia, sono in grado di recuperare i messaggi solo quando vengono inviati in tempo reale (ad esempio, se compongo nuovi messaggi, mentre il mio programma spark è in esecuzione - quindi ottengo quei messaggi).apache spark streaming - kafka - lettura dei vecchi messaggi

Sto cambiando il mio groupID e consumerID per assicurarmi che Zookeeper non stia semplicemente dando messaggi che conosce il mio programma ha visto prima.

Supponendo che la scintilla veda l'offset in zookeeper come -1, non dovrebbe leggere tutti i vecchi messaggi in coda? Sto semplicemente fraintendendo il modo in cui una coda di kafka può essere usata? Sono molto nuovo a scintilla e kafka, quindi non posso escludere che sto solo fraintendendo qualcosa.

package com.kibblesandbits 

import org.apache.spark.SparkContext 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.streaming.kafka.KafkaUtils 

import net.liftweb.json._ 

object KafkaStreamingTest { 

    val cfg = new ConfigLoader().load 
    val zookeeperHost = cfg.zookeeper.host 
    val zookeeperPort = cfg.zookeeper.port 
    val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot 

    implicit val formats = DefaultFormats 

    def parser(json: String): String = { 
    return json 
} 

def main(args : Array[String]) { 
    val zkQuorum = "test-spark02:9092" 

    val group = "myGroup99" 
    val topic = Map("testtopic" -> 1) 
    val sparkContext = new SparkContext("local[3]", "KafkaConsumer1_New") 
    val ssc = new StreamingContext(sparkContext, Seconds(3)) 
    val json_stream = KafkaUtils.createStream(ssc, zkQuorum, group, topic) 
    var gp = json_stream.map(_._2).map(parser) 

    gp.saveAsTextFiles("/tmp/sparkstreaming/mytest", "json") 
    ssc.start() 
} 

Durante l'esecuzione, verrà visualizzato il seguente messaggio. Quindi sono fiducioso che non è solo non vedere i messaggi perché l'offset è impostato.

14/12/05 13:34:08 INFORMAZIONI ConsumerFetcherManager: [ConsumerFetcherManager-1.417.808,045047 millions] fetcher aggiunto per partizioni ArrayBuffer ([[testtopic, 0], initOffset -1 di mediare id: 1 questo, : test-spark02.vpc, porta: 9092], [[testtopic, 1], initOffset -1 all'id broker: 1, host: test-spark02.vpc, porta: 9092], [[testtopic, 2], initOffset -1 al broker id: 1, host: test-spark02.vpc, porta: 9092], [[testtopic, 3], initOffset -1 all'id broker: 1, host: test-spark02.vpc, porta: 9092], [[testtopic, 4], initOffset -1 al broker id: 1, host: test-spark02.vpc, porta: 9092])

Quindi, se vengono inseriti 1000 nuovi messaggi, è possibile visualizzare quei 1000 messaggi salvati nella directory temporanea. Ma non so come leggere i messaggi esistenti, che dovrebbero essere numerati (a questo punto) decine di migliaia.

risposta

8

Utilizzare il metodo factory alternativo sulla KafkaUtils che consente di fornire una configurazione per il consumatore Kafka:

def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
     ssc: StreamingContext, 
     kafkaParams: Map[String, String], 
     topics: Map[String, Int], 
     storageLevel: StorageLevel 
    ): ReceiverInputDStream[(K, V)] 

poi costruire una mappa con la configurazione di Kafka e aggiungere il parametro 'kafka.auto.offset.reset' impostato su 'più piccolo':

val kafkaParams = Map[String, String](
     "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, 
     "zookeeper.connection.timeout.ms" -> "10000", 
     "kafka.auto.offset.reset" -> "smallest" 
) 

prevedono che config per il metodo factory sopra. "kafka.auto.offset.reset" -> "più piccolo" indica al consumatore di iniziare dall'offset più piccolo nel tuo argomento.

+0

ancora non ha funzionato per me, c'è un altro modo possibile per farlo? Ho un messaggio 10k che risiede nell'argomento, ma solo in grado di recuperarli quando ottengo un nuovo messaggio nell'argomento. Come ottenere i dati già memorizzati nell'argomento di kafka? –

+0

"auto.offset.reset" -> "il più piccolo" ha funzionato per me. Inoltre, secondo i documenti, https://cwiki.apache.org/confluence/display/KAFKA/FAQ dovrebbe essere "meno recente" se usi la versione 0.9 – Evgenii