2014-06-23 13 views
59

Quando si utilizza Scala in Spark, ogni volta che si esegue il dump dei risultati utilizzando saveAsTextFile, sembra dividere l'output in più parti. Sto solo passando un parametro (percorso) ad esso.come rendere saveAsTextFile NON dividere l'output in più file?

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) 
year.saveAsTextFile("year") 
  1. fa il numero di uscite corrispondono al numero di riduttori che utilizza?
  2. Significa che l'uscita è compressa?
  3. So che posso combinare l'output utilizzando bash, ma esiste un'opzione per archiviare l'output in un singolo file di testo, senza dividere ?? Ho guardato i documenti API, ma non dice molto su questo.
+1

È generalmente una cattiva prassi utilizzare un solo file in Big Data se il file è di grandi dimensioni. – samthebest

+0

Qual è la procedura migliore se l'output fosse, ad esempio, un file ordinato? Mantienilo come una raccolta di file e fai in modo che i molti nomi di file di output siano una sorta di indice (ad esempio qualcosa come il primo file è denominato "aa", quelli centrali sono come "fg", l'ultimo "zzy")? – Rdesmond

+0

Spesso accade che un intenso lavoro di scintilla genera solo un output molto piccolo (aggregazione, kpis, popolarità, ...) che viene prodotto su hdf, ma molto probabilmente usato da applicazioni non correlate ai big data. Più pulito e più facile in questo caso avere un singolo file con nome per trasferimenti e consumi. –

risposta

84

Il motivo per cui lo salva come file multipli è perché il calcolo è distribuito. Se l'uscita è abbastanza piccolo tale che si pensa di poter montare su una macchina, allora si può terminare il programma con

val arr = year.collect() 

e quindi salvare la matrice risultante come un file, Un altro modo sarebbe quello di utilizzare un costume partizionatore, partitionBy, e fare in modo che tutto vada a una partizione anche se non è consigliabile perché non si otterrà alcuna parallelizzazione.

Se è necessario salvare il file con saveAsTextFile, è possibile utilizzare coalesce(1,true).saveAsTextFile(). Ciò significa fondamentalmente che il calcolo si fonde in una partizione. Puoi anche utilizzare repartition(1) che è solo un wrapper per coalesce con l'argomento shuffle impostato su true. Guardando attraverso la fonte di RDD.scala è come ho capito la maggior parte di questa roba, dovresti dare un'occhiata.

+1

come si salva un array come file di testo ?? non esiste una funzione saveAsTextFile per un array. solo per RDD. – user2773013

+2

@ user2773013 beh l'approccio sarebbe "coalesce" o l'approccio 'partition' che ho suggerito, ma non c'è davvero alcun punto in cui archiviare su hdf se è solo su 1 nodo ed è per questo che usare collect è davvero il modo corretto di andare – aaronman

+1

grazie @aaronman !!! – user2773013

16

È possibile chiamare coalesce(1) e quindi saveAsTextFile() - ma potrebbe essere una cattiva idea se si dispone di molti dati. File separati per divisione sono generati proprio come in Hadoop per consentire a mappatori e riduttori separati di scrivere su file diversi. Avere un singolo file di output è solo una buona idea se si hanno pochissimi dati, nel qual caso si potrebbe anche fare collect(), come ha detto @aronman.

+0

Nice non pensava di essere più "pulito" di fare casino con il partizionatore, dicendo che continuo a pensare se il tuo obiettivo è quello di farlo in un file 'collect' è probabilmente il modo giusto per farlo – aaronman

+1

questo funziona. Ma, se usi coalescenza, significa che stai usando solo un riduttore. Questo non rallenterebbe il processo perché si usa solo un riduttore ?? – user2773013

+1

Sì, ma è quello che stai chiedendo. Spark restituisce un file per partizione.D'altra parte, perché ti importa del numero di file? Quando si leggono i file nella scintilla, è sufficiente specificare la directory principale e tutte le partizioni vengono lette come singolo RDD – David

2

Si sarà in grado di farlo nella prossima versione di Spark, nella versione corrente 1.0.0 non è possibile a meno che non lo si faccia manualmente in qualche modo, ad esempio, come hai detto, con una chiamata di script bash.

+0

grazie per le informazioni! – user2773013

+0

la prossima versione di Spark è qui e non è ovvio come farlo :( –

1

Vorrei anche ricordare che la documentazione indica chiaramente che gli utenti devono prestare attenzione quando chiamano in coalesce con un numero limitato di partizioni. questo può far sì che le partizioni upstream ereditino questo numero di partizioni.

Non consiglio l'uso di coalizione (1) a meno che non sia strettamente necessario.

2

In Spark 1.6.1 il formato è come mostrato di seguito. Crea un singolo file di output. È consigliabile utilizzarlo se l'output è abbastanza piccolo da gestire. Fondamentalmente ciò che fa è che restituisce un nuovo RDD che viene ridotto in partizioni con numPartition. Se si esegue una drastica coalesce, per esempio a numPartitions = 1, questo può comportare la computazione che avrà luogo il minor numero di nodi che ti piace (per esempio, un nodo nel caso di numPartitions = 1)

pair_result.coalesce(1).saveAsTextFile("/app/data/") 
2

Come altri hanno detto, è possibile raccogliere o fondersi i dati impostato per forzare Spark a produrre un singolo file. Ma questo limita anche il numero di attività Spark che possono funzionare sul set di dati in parallelo.Preferisco lasciare che crei un centinaio di file nella directory di output HDFS, quindi utilizzare hadoop fs -getmerge /hdfs/dir /local/file.txt per estrarre i risultati in un singolo file nel filesystem locale. Questo ha più senso quando il tuo output è un report relativamente piccolo, ovviamente.

0

Ecco la mia risposta per generare un singolo file. Ho appena aggiunto coalesce(1)

val year = sc.textFile("apat63_99.txt") 
       .map(_.split(",")(1)) 
       .flatMap(_.split(",")) 
       .map((_,1)) 
       .reduceByKey((_+_)).map(_.swap) 
year.saveAsTextFile("year") 

Codice:

year.coalesce(1).saveAsTextFile("year") 
1

È possibile chiamare repartition() e seguire in questo modo:

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap) 

var repartitioned = year.repartition(1) 
repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00") 

enter image description here

1

Per chi lavora con un set di dati più grande e ancora disposto a trarre profitto dal parallelismo dello , rdd.coalesce(1).saveAsTextFile("path")non è la soluzione. L'intera pipeline (dall'ultima azione spark alla memorizzazione) verrà eseguita su 1 executor.

È possibile invece primo eseguire il gasdotto su qualunque numero di esecutori e utilizzare saveAsTextFile (che produrrà diversi file in output) e poi unire solo tutti questi file utilizzando apache FileSystem api.

Il seguente metodo è data la RDD per memorizzare e il percorso dove memorizzarlo:

import org.apache.spark.rdd.RDD 
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} 
import org.apache.hadoop.conf.Configuration 

def saveAsSingleTextFile(
    outputRDD: RDD[String], 
    outputFile: String 
): Unit = { 

    // Classic saveAsTextFile in a temporary folder: 
    outputRDD.saveAsTextFile(outputFile + ".tmp") 

    // The facility allowing file manipulations on hdfs: 
    val hdfs = FileSystem.get(new Configuration()) 

    // Merge the folder into a single file: 
    FileUtil.copyMerge(
    hdfs, 
    new Path(outputFile + ".tmp"), 
    hdfs, 
    new Path(outputFile), 
    true, 
    new Configuration(), 
    null) 

    // And we delete the intermediate folder: 
    hdfs.delete(new Path(outputFile + ".tmp"), true) 
} 

questo modo l'elaborazione è ancora distribuito e la parte fusione avviene in seguito, che limita la perdita di prestazioni.

In bonus è possibile fornire il nome esatto del file di output, contrariamente a rdd.coalesce (1) .saveAsTextFile ("mio/percorso") che produce il file my/percorso/parte-00000.

Problemi correlati