Quando si crea un RDD da un file di testo, probabilmente desidera mappare i dati in una classe case, così si potrebbe aggiungere la sorgente di ingresso in quella fase:
case class Person(inputPath: String, name: String, age: Int)
val inputPath = "hdfs://localhost:9000/tmp/demo-input-data/persons.txt"
val rdd = sc.textFile(inputPath).map {
l =>
val tokens = l.split(",")
Person(inputPath, tokens(0), tokens(1).trim().toInt)
}
rdd.collect().foreach(println)
Se non si vuole mix "dei dati aziendali" con metadati:
case class InputSourceMetaData(path: String, size: Long)
case class PersonWithMd(name: String, age: Int, metaData: InputSourceMetaData)
// Fake the size, for demo purposes only
val md = InputSourceMetaData(inputPath, size = -1L)
val rdd = sc.textFile(inputPath).map {
l =>
val tokens = l.split(",")
PersonWithMd(tokens(0), tokens(1).trim().toInt, md)
}
rdd.collect().foreach(println)
e se si promuove la RDD ad un dataframe:
import sqlContext.implicits._
val df = rdd.toDF()
df.registerTempTable("x")
è possibile interrogare piace
sqlContext.sql("select name, metadata from x").show()
sqlContext.sql("select name, metadata.path from x").show()
sqlContext.sql("select name, metadata.path, metadata.size from x").show()
Aggiornamento
Si può leggere i file in HDFS utilizzando org.apache.hadoop.fs.FileSystem.listFiles()
in modo ricorsivo.
Dato un elenco di nomi di file in un valore files
(collezione standard Scala contenente org.apache.hadoop.fs.LocatedFileStatus
), è possibile creare un RDD per ogni file:
val rdds = files.map { f =>
val md = InputSourceMetaData(f.getPath.toString, f.getLen)
sc.textFile(md.path).map {
l =>
val tokens = l.split(",")
PersonWithMd(tokens(0), tokens(1).trim().toInt, md)
}
}
Ora è possibile reduce
l'elenco dei RDDs in una sola : La funzione per reduce
concats tutte RDDS in una sola:
val rdd = rdds.reduce(_ ++ _)
rdd.collect().foreach(println)
questo funziona, ma non posso verificare se questo distribuisce/esegue bene con i file di grandi dimensioni.
perché vuoi/bisogno di questo? –
Ogni record deve mostrare quale file è in origine ... più facile eseguire il debug delle cose quando si conosce l'intero percorso che attraversa (come un file di input non valido) – mcmcmc