2015-07-02 16 views
7

Ho un ambiente spark spark con la scintilla 1.2.0 in cui recupero i dati da una cartella locale e ogni volta che trovo un nuovo file aggiunto alla cartella eseguo qualche trasformazione.scrivere un RDD in HDFS in un contesto di spark-streaming

val ssc = new StreamingContext(sc, Seconds(10)) 
val data = ssc.textFileStream(directory) 

Al fine di effettuare la mia analisi sui dati DSTREAM devo trasformarla in un Array

var arr = new ArrayBuffer[String](); 
    data.foreachRDD { 
    arr ++= _.collect() 
} 

Poi ho utilizzare i dati ottenuti per estrarre le informazioni che voglio e di salvarli su HDFS.

val myRDD = sc.parallelize(arr) 
myRDD.saveAsTextFile("hdfs directory....") 

Da quando ho davvero bisogno di manipolare i dati con un array è impossibile salvare i dati su HDFS con DStream.saveAsTextFiles("...") (che funziona bene) e devo salvare il RDD ma con questo preocedure ho finalmente avere i file di output vuoti denominati part-00000 ecc ...

Con un arr.foreach(println) sono in grado di vedere i risultati corretti delle transofmazioni.

Il mio sospetto è che la scintilla tenta in ogni batch di scrivere i dati negli stessi file, eliminando ciò che è stato scritto in precedenza. Ho provato a salvare in una cartella denominata dinamica come myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString()) ma viene creata sempre una sola piegatura e i file di output sono ancora vuoti.

Come posso scrivere un RDD in HDFS in un contesto di streaming spark?

+0

Credo che il problema è che il vostro arr non è disponibile su tutti i lavoratori. Hai provato a trasmettere il tuo arr e poi finalmente lo hai scritto in hdf? –

+0

perché ho bisogno di monitorare una cartella e intercettare tutto il nuovo file caricato e scintilla suoni in streaming come una buona soluzione. Non è una macchina singola ma un cluster di 2 macchine. Ora sto solo scrivendo i file come testo ma in futuro dovrò scrivere i file parquet ed è piuttosto semplice con Spark – drstein

+0

Proverai questo? var arr = new ArrayBuffer [String](); val onda = sc.broadcast (arr) data.foreachRDD { trasmesso ++ = _.collect()} val myRDD = sc.parallelize (in onda) myRDD.saveAsTextFile ("directory HDFS ....") –

risposta

5

Si sta utilizzando Spark Streaming in un modo in cui non è stato progettato. Consiglio di abbandonare Spark per il tuo caso d'uso o di adattare il tuo codice in modo che funzioni con Spark. La raccolta dell'array sul driver vanifica lo scopo di utilizzare un motore distribuito e rende la tua app effettivamente single-machine (due macchine causano anche un sovraccarico maggiore rispetto all'elaborazione dei dati su una singola macchina).

Tutto ciò che puoi fare con un array, puoi farlo con Spark. Quindi, esegui i calcoli all'interno dello stream, distribuito sugli operatori e scrivi l'output utilizzando DStream.saveAsTextFiles(). È possibile utilizzare foreachRDD + saveAsParquet(path, overwrite = true) per scrivere su un singolo file.

+0

Grazie, ho capito perfettamente il tuo punto di vista, cercherò di cambiare la logica trasform per utilizzare DStream. Sai se è possibile eseguire lo spark-streaming in ogni batch per salvare i record nello stesso file? In questo momento ottengo una nuova cartella con nuovi file ogni intervallo di batch. – drstein

+1

Sì, con foreachRDD + saveAsParquet c'è un'opzione per sovrascrivere. –

+0

@MariusSoutier puoi aiutarmi con questo 'http: // stackoverflow.com/questions/39363586/issue-while-storing-data-da-spark-streaming-to-cassanadra' – Naresh

2

@vzamboni: Spark 1.5+ dataframes api ha questa caratteristica:

dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).partitionBy("parameter1", "parameter2").save(path); 
Problemi correlati