Ho un ambiente spark spark con la scintilla 1.2.0 in cui recupero i dati da una cartella locale e ogni volta che trovo un nuovo file aggiunto alla cartella eseguo qualche trasformazione.scrivere un RDD in HDFS in un contesto di spark-streaming
val ssc = new StreamingContext(sc, Seconds(10))
val data = ssc.textFileStream(directory)
Al fine di effettuare la mia analisi sui dati DSTREAM devo trasformarla in un Array
var arr = new ArrayBuffer[String]();
data.foreachRDD {
arr ++= _.collect()
}
Poi ho utilizzare i dati ottenuti per estrarre le informazioni che voglio e di salvarli su HDFS.
val myRDD = sc.parallelize(arr)
myRDD.saveAsTextFile("hdfs directory....")
Da quando ho davvero bisogno di manipolare i dati con un array è impossibile salvare i dati su HDFS con DStream.saveAsTextFiles("...")
(che funziona bene) e devo salvare il RDD ma con questo preocedure ho finalmente avere i file di output vuoti denominati part-00000 ecc ...
Con un arr.foreach(println)
sono in grado di vedere i risultati corretti delle transofmazioni.
Il mio sospetto è che la scintilla tenta in ogni batch di scrivere i dati negli stessi file, eliminando ciò che è stato scritto in precedenza. Ho provato a salvare in una cartella denominata dinamica come myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString())
ma viene creata sempre una sola piegatura e i file di output sono ancora vuoti.
Come posso scrivere un RDD in HDFS in un contesto di streaming spark?
Credo che il problema è che il vostro arr non è disponibile su tutti i lavoratori. Hai provato a trasmettere il tuo arr e poi finalmente lo hai scritto in hdf? –
perché ho bisogno di monitorare una cartella e intercettare tutto il nuovo file caricato e scintilla suoni in streaming come una buona soluzione. Non è una macchina singola ma un cluster di 2 macchine. Ora sto solo scrivendo i file come testo ma in futuro dovrò scrivere i file parquet ed è piuttosto semplice con Spark – drstein
Proverai questo? var arr = new ArrayBuffer [String](); val onda = sc.broadcast (arr) data.foreachRDD { trasmesso ++ = _.collect()} val myRDD = sc.parallelize (in onda) myRDD.saveAsTextFile ("directory HDFS ....") –