2016-06-27 38 views
13

Desidero creare una pipeline di elaborazione dati in AWS per utilizzare infine i dati elaborati per Machine Learning.Unisci file di output CSV con intestazione singola

Possiedo uno script Scala che acquisisce i dati grezzi da S3, li elabora e li scrive su HDFS o anche su S3 con Spark-CSV. Penso di poter utilizzare più file come input se voglio utilizzare lo strumento AWS Machine Learning per la formazione di un modello di previsione. Ma se voglio usare qualcos'altro, presumo che sia meglio se ricevo un singolo file di output CSV.

Attualmente, come io non voglio usare ripartizione (1)coalesce (1) per il calcolo delle prestazioni, ho usato Hadoop fs -getmerge per il test manuale, ma come si fonde solo il contenuto dei file di output del lavoro, sto riscontrando un piccolo problema. Ho bisogno di una singola riga di intestazioni nel file di dati per la formazione del modello di previsione.

Se si utilizza .option("header","true") per spark-csv, quindi scrive le intestazioni in ogni file di output e dopo l'unione ho tante righe di intestazioni nei dati quanti file di output. Ma se l'opzione di intestazione è falsa, allora non aggiunge alcuna intestazione.

Ora ho trovato un'opzione per unire i file all'interno dello script di Scala con l'API Hadoop FileUtil.copyMerge. Ho provato questo in spark-shell con il codice qui sotto.

import org.apache.hadoop.fs.FileUtil 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
val configuration = new Configuration(); 
val fs = FileSystem.get(configuration); 
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "") 

Ma questa soluzione ancora solo concatena i file uno sopra l'altro e non gestisce le intestazioni. Come posso ottenere un file di output con una sola riga di intestazioni?

Ho anche provato ad aggiungere df.columns.mkString(",") come ultimo argomento per copyMerge, ma questo aggiunge le intestazioni ancora più volte, non una volta.

+0

Sono anch'io con lo stesso problema. È stato risolto? –

+0

@senthilkumarp Sfortunatamente no. L'unico modo per ottenere il risultato necessario era usare 'df.coalesce (1) .write.format (" com.databricks.spark.csv "). Option (" header "," true "). Save (resultPath)' che probabilmente non funzionerebbe così bene con i dati più grandi. –

+0

sì anch'io ho usato la stessa coalesce (1) –

risposta

0

Per unire i file in una cartella in un unico file:

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.fs._ 

def merge(srcPath: String, dstPath: String): Unit = { 
    val hadoopConfig = new Configuration() 
    val hdfs = FileSystem.get(hadoopConfig) 
    FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null) 
} 

Se si desidera unire tutti i file in un unico file, ma ancora nel stessa cartella (ma questo porta tutti i dati al nodo conducente):

dataFrame 
     .coalesce(1) 
     .write 
     .format("com.databricks.spark.csv") 
     .option("header", "true") 
     .save(out) 

Un'altra soluzione potrebbe essere quella di usare la soluzione # 2 quindi spostare il un file all'interno della cartella ad un altro percorso (con il nome della nostra CSV file).

def df2csv(df: DataFrame, fileName: String, sep: String = ",", header: Boolean = false): Unit = { 
    val tmpDir = "tmpDir" 

    df.repartition(1) 
     .write 
     .format("com.databricks.spark.csv") 
     .option("header", header.toString) 
     .option("delimiter", sep) 
     .save(tmpDir) 

    val dir = new File(tmpDir) 
    val tmpCsvFile = tmpDir + File.separatorChar + "part-00000" 
    (new File(tmpCsvFile)).renameTo(new File(fileName)) 

    dir.listFiles.foreach(f => f.delete) 
    dir.delete 
}