2015-10-23 8 views
5

Sono nuovo di Spark e sto cercando di inserire una colonna per ogni riga di input con il nome del file da cui proviene.Come aggiungere il nome del file sorgente a ciascuna riga in Spark?

Ho visto gli altri fare una domanda simile, ma tutte le loro risposte utilizzate wholeTextFile, ma sto cercando di farlo per file CSV più grandi (leggi usando la libreria Spark-CSV), file JSON e file Parquet (non solo piccoli file di testo).

posso usare il spark-shell per ottenere un elenco di nomi di file:

val df = sqlContext.read.parquet("/blah/dir") 
val names = df.select(inputFileName()) 
names.show 

ma questo è un dataframe. Non sono sicuro di come aggiungerlo come colonna a ogni riga (e se quel risultato è ordinato uguale ai dati iniziali, anche se presumo che lo sia sempre) e come farlo come soluzione generale per tutti i tipi di input .

+0

perché vuoi/bisogno di questo? –

+1

Ogni record deve mostrare quale file è in origine ... più facile eseguire il debug delle cose quando si conosce l'intero percorso che attraversa (come un file di input non valido) – mcmcmc

risposta

2

Quando si crea un RDD da un file di testo, probabilmente desidera mappare i dati in una classe case, così si potrebbe aggiungere la sorgente di ingresso in quella fase:

case class Person(inputPath: String, name: String, age: Int) 
val inputPath = "hdfs://localhost:9000/tmp/demo-input-data/persons.txt" 
val rdd = sc.textFile(inputPath).map { 
    l => 
     val tokens = l.split(",") 
     Person(inputPath, tokens(0), tokens(1).trim().toInt) 
    } 
rdd.collect().foreach(println) 

Se non si vuole mix "dei dati aziendali" con metadati:

case class InputSourceMetaData(path: String, size: Long) 
case class PersonWithMd(name: String, age: Int, metaData: InputSourceMetaData) 

// Fake the size, for demo purposes only 
val md = InputSourceMetaData(inputPath, size = -1L) 
val rdd = sc.textFile(inputPath).map { 
    l => 
    val tokens = l.split(",") 
    PersonWithMd(tokens(0), tokens(1).trim().toInt, md) 
} 
rdd.collect().foreach(println) 

e se si promuove la RDD ad un dataframe:

import sqlContext.implicits._ 
val df = rdd.toDF() 
df.registerTempTable("x") 

è possibile interrogare piace

sqlContext.sql("select name, metadata from x").show() 
sqlContext.sql("select name, metadata.path from x").show() 
sqlContext.sql("select name, metadata.path, metadata.size from x").show() 

Aggiornamento

Si può leggere i file in HDFS utilizzando org.apache.hadoop.fs.FileSystem.listFiles() in modo ricorsivo.

Dato un elenco di nomi di file in un valore files (collezione standard Scala contenente org.apache.hadoop.fs.LocatedFileStatus), è possibile creare un RDD per ogni file:

val rdds = files.map { f => 
    val md = InputSourceMetaData(f.getPath.toString, f.getLen) 

    sc.textFile(md.path).map { 
    l => 
     val tokens = l.split(",") 
     PersonWithMd(tokens(0), tokens(1).trim().toInt, md) 
    } 
} 

Ora è possibile reduce l'elenco dei RDDs in una sola : La funzione per reduce concats tutte RDDS in una sola:

val rdd = rdds.reduce(_ ++ _) 
rdd.collect().foreach(println) 

questo funziona, ma non posso verificare se questo distribuisce/esegue bene con i file di grandi dimensioni.

+0

Lo apprezzo sicuramente, ma l'unico problema è che devi specificare il file percorso completo e nome file del file di input. Sto solo specificando la directory di input, inserendo tutti i file di input che si trovano in essa. – mcmcmc

+0

Quale funzione stai attualmente utilizzando? È 'wholeTextFiles()'? – Beryllium

+0

Per i file CSV sto usando la libreria databricks/spark-csv 'sqlContext.read.format (" com.databricks.spark.csv "). Load ("/path/dir/")'. Per i file parquet, usando 'sqlContext.read.parquet ("/path/parquetdir/")'. – mcmcmc

Problemi correlati