Nella mia applicazione di streaming Spark, voglio mappare un valore basato su un dizionario recuperato da un backend (ElasticSearch). Voglio periodicamente aggiornare periodicamente il dizionario, nel caso sia stato aggiornato nel back-end. Sarebbe simile alla capacità di aggiornamento periodico del filtro Logstash translate. Come posso ottenere questo risultato con Spark (ad esempio, in qualche modo, l'RDD non funziona ogni 30 secondi)?Spark Streaming: come aggiornare periodicamente il RDD memorizzato nella cache?
6
A
risposta
5
Il modo migliore che ho trovato per farlo è quello di ricreare l'RDD e mantenere un riferimento mutabile ad esso. Spark Streaming è al centro di un framework di programmazione su Spark. Possiamo piggy-back sullo scheduler per fare in modo che l'RDD sia aggiornato periodicamente. Per questo, usiamo un DSTREAM vuoto che noi programmiamo solo per l'operazione di aggiornamento:
def getData():RDD[Data] = ??? function to create the RDD we want to use af reference data
val dstream = ??? // our data stream
// a dstream of empty data
val refreshDstream = new ConstantInputDStream(ssc, sparkContext.parallelize(Seq())).window(Seconds(refreshInterval),Seconds(refreshInterval))
var referenceData = getData()
referenceData.cache()
refreshDstream.foreachRDD{_ =>
// evict the old RDD from memory and recreate it
referenceData.unpersist(true)
referenceData = getData()
referenceData.cache()
}
val myBusinessData = dstream.transform(rdd => rdd.join(referenceData))
... etc ...
In passato, ho anche provato solo con interleaving cache()
e unpersist()
senza risultato (rinfresca solo una volta). Ricreare l'RDD rimuove tutto il lignaggio e fornisce un carico pulito dei nuovi dati.
Problemi correlati
- 1. Spark Caching: RDD Solo l'8% nella cache
- 2. Spark streaming DStream RDD per ottenere il nome file
- 3. Spark Streaming: foreachRDD aggiorna il mio mongo RDD
- 4. Come posso verificare se il mio RDD o dataframe è memorizzato nella cache o no?
- 5. Spark Streaming mapWithState sembra ricostruire periodicamente lo stato completo
- 6. Persistendo Spark uscita Streaming
- 7. Oggetto memorizzato nella cache nella console?
- 8. elemento memorizzato nella cache non in scadenza
- 9. Prestazioni Nginx Fastcgi_cache - Disco memorizzato nella cache VS tmpfs memorizzato nella cache VS file statico
- 10. RDD Aggregate in spark
- 11. Spark RDD checkpoint persistito RDDs/cache sta eseguendo il DAG due volte
- 12. Perché window.name viene memorizzato nella cache?
- 13. Database Magento L'IP è memorizzato nella cache
- 14. Come aggiornare/rimuovere un elemento già memorizzato nella cache all'interno di una collezione di oggetti
- 15. Spark Streaming UpdateStateByKey
- 16. Spark SQL: Convertire RDD [GenericData.Record] per dataframe
- 17. C# HttpRuntime.Cache.Insert() Non contiene il valore memorizzato nella cache
- 18. Lettura da Cassandra con Spark Streaming
- 19. RDD partizionamento in streaming scintilla
- 20. Come creare Spark RDD da un iteratore?
- 21. Streaming di Spark Streaming Kafka
- 22. Apache Spark: Come posso convertire uno Spark DataFrame in un RDD con tipo RDD [(Tipo1, Tipo2, ...)]?
- 23. Come condividere Spark RDD tra 2 contesti Spark?
- 24. Come ottenere l'ennesima riga di Spark RDD?
- 25. Treat Spark RDD come plain Seq
- 26. Spark Streaming stato storico
- 27. Elaborazione in corso in Spark Streaming
- 28. Streaming Spark: sicurezza dell'applicazione
- 29. Risultati intermedi nella cache nella pipeline Spark ML
- 30. Apache filtro Spark RDD in due RDDs
Esiste un'alternativa java per ConstantInputDStream? – user2100493
È garantito che i lavori di aggiornamento di referenceData (attivati da getData()) avvengono sempre prima che i lavori di businessDataDStream siano pianificati? Avremo uno scenario in cui l'aggiornamento referenceData sta accadendo quando sono programmati i lavori rdd.join (referenceData) ?? – Cheeko
@maasg Come programmare la chiamata 'getData()'? Dalla domanda, 'ogni 30 secondi'? –