2016-06-05 16 views
6

Nella mia applicazione di streaming Spark, voglio mappare un valore basato su un dizionario recuperato da un backend (ElasticSearch). Voglio periodicamente aggiornare periodicamente il dizionario, nel caso sia stato aggiornato nel back-end. Sarebbe simile alla capacità di aggiornamento periodico del filtro Logstash translate. Come posso ottenere questo risultato con Spark (ad esempio, in qualche modo, l'RDD non funziona ogni 30 secondi)?Spark Streaming: come aggiornare periodicamente il RDD memorizzato nella cache?

risposta

5

Il modo migliore che ho trovato per farlo è quello di ricreare l'RDD e mantenere un riferimento mutabile ad esso. Spark Streaming è al centro di un framework di programmazione su Spark. Possiamo piggy-back sullo scheduler per fare in modo che l'RDD sia aggiornato periodicamente. Per questo, usiamo un DSTREAM vuoto che noi programmiamo solo per l'operazione di aggiornamento:

def getData():RDD[Data] = ??? function to create the RDD we want to use af reference data 
val dstream = ??? // our data stream 

// a dstream of empty data 
val refreshDstream = new ConstantInputDStream(ssc, sparkContext.parallelize(Seq())).window(Seconds(refreshInterval),Seconds(refreshInterval)) 

var referenceData = getData() 
referenceData.cache() 
refreshDstream.foreachRDD{_ => 
    // evict the old RDD from memory and recreate it 
    referenceData.unpersist(true) 
    referenceData = getData() 
    referenceData.cache() 
} 

val myBusinessData = dstream.transform(rdd => rdd.join(referenceData)) 
... etc ... 

In passato, ho anche provato solo con interleaving cache() e unpersist() senza risultato (rinfresca solo una volta). Ricreare l'RDD rimuove tutto il lignaggio e fornisce un carico pulito dei nuovi dati.

+0

Esiste un'alternativa java per ConstantInputDStream? – user2100493

+0

È garantito che i lavori di aggiornamento di referenceData (attivati ​​da getData()) avvengono sempre prima che i lavori di businessDataDStream siano pianificati? Avremo uno scenario in cui l'aggiornamento referenceData sta accadendo quando sono programmati i lavori rdd.join (referenceData) ?? – Cheeko

+0

@maasg Come programmare la chiamata 'getData()'? Dalla domanda, 'ogni 30 secondi'? –

Problemi correlati