Desidero creare una pipeline di elaborazione dati in AWS per utilizzare infine i dati elaborati per Machine Learning.Unisci file di output CSV con intestazione singola
Possiedo uno script Scala che acquisisce i dati grezzi da S3, li elabora e li scrive su HDFS o anche su S3 con Spark-CSV. Penso di poter utilizzare più file come input se voglio utilizzare lo strumento AWS Machine Learning per la formazione di un modello di previsione. Ma se voglio usare qualcos'altro, presumo che sia meglio se ricevo un singolo file di output CSV.
Attualmente, come io non voglio usare ripartizione (1) né coalesce (1) per il calcolo delle prestazioni, ho usato Hadoop fs -getmerge per il test manuale, ma come si fonde solo il contenuto dei file di output del lavoro, sto riscontrando un piccolo problema. Ho bisogno di una singola riga di intestazioni nel file di dati per la formazione del modello di previsione.
Se si utilizza .option("header","true")
per spark-csv, quindi scrive le intestazioni in ogni file di output e dopo l'unione ho tante righe di intestazioni nei dati quanti file di output. Ma se l'opzione di intestazione è falsa, allora non aggiunge alcuna intestazione.
Ora ho trovato un'opzione per unire i file all'interno dello script di Scala con l'API Hadoop FileUtil.copyMerge
. Ho provato questo in spark-shell
con il codice qui sotto.
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")
Ma questa soluzione ancora solo concatena i file uno sopra l'altro e non gestisce le intestazioni. Come posso ottenere un file di output con una sola riga di intestazioni?
Ho anche provato ad aggiungere df.columns.mkString(",")
come ultimo argomento per copyMerge
, ma questo aggiunge le intestazioni ancora più volte, non una volta.
Sono anch'io con lo stesso problema. È stato risolto? –
@senthilkumarp Sfortunatamente no. L'unico modo per ottenere il risultato necessario era usare 'df.coalesce (1) .write.format (" com.databricks.spark.csv "). Option (" header "," true "). Save (resultPath)' che probabilmente non funzionerebbe così bene con i dati più grandi. –
sì anch'io ho usato la stessa coalesce (1) –