2015-01-28 8 views
7

Sono relativamente nuovo ad Apache Spark e voglio creare un singolo RDD in Python da elenchi di dizionari che vengono salvati in più file JSON (ognuno è gzip e contiene un elenco di dizionari). Il risultante RDD quindi, in parole povere, potrebbe contenere tutte le liste di dizionari combinate in un unico elenco di dizionari. Non sono stato in grado di trovare questo nella documentazione (https://spark.apache.org/docs/1.2.0/api/python/pyspark.html), ma se mi è mancato, fatemelo sapere.Come caricare la directory dei file JSON in Apache Spark in Python

Finora ho provato a leggere i file JSON e creando l'elenco combinato in Python, quindi utilizzando sc.parallelize(), tuttavia l'intero set di dati è troppo grande per entrare in memoria in modo questa non è una soluzione pratica. Sembra che Spark abbia un modo intelligente di gestire questo caso d'uso, ma non ne sono consapevole.

Come posso creare un singolo RDD in Python comprendente gli elenchi in tutti i file JSON?

Vorrei anche ricordare che non voglio usare Spark SQL. Mi piacerebbe usare funzioni come map, filter, ecc., Se possibile.

risposta

5

Dopo quello tgpfeiffer menzionati nella loro risposta e commentare, ecco quello che ho fatto.

In primo luogo, come accennato, i file JSON dovevano essere formattati in modo che avessero un dizionario per riga piuttosto che un singolo elenco di dizionari. Poi, è stato così semplice come:

my_RDD_strings = sc.textFile(path_to_dir_with_JSON_files) 
my_RDD_dictionaries = my_RDD_strings.map(json.loads) 

Se c'è un modo migliore o più efficiente per fare questo, per favore fatemelo sapere, ma questo sembra funzionare.

2

È possibile utilizzare sqlContext.jsonFile() per ottenere uno SchemaRDD (che è un RDD [riga] più uno schema) che può essere quindi utilizzato con Spark SQL. Oppure vedi Loading JSON dataset into Spark, then use filter, map, etc per una pipeline di elaborazione non SQL. Penso che potrebbe essere necessario decomprimere i file, e anche Spark può funzionare solo con file in cui ogni riga è un singolo documento JSON (cioè, nessun oggetto multilinea possibile).

+0

grazie per la risposta. Avrei dovuto dire che non voglio usare Spark SQL, voglio usare una pipeline di elaborazione non SQL come nella domanda a cui fai riferimento. Aggiornerò la mia domanda originale La risposta alla domanda che hai fatto riferimento sembra essere in Scala, non in Python. Grazie ancora per il tuo aiuto! – Brandt

+1

Esatto, è in Scala, ma l'idea può essere applicata al tuo problema: carica il set di dati di input usando 'sparkContext.textFile()' (che in realtà [sembra supportare i file gzippati] (http://stackoverflow.com/questions/16302385/gzip-support-in-spark)), quindi analizza le stringhe con un parser di tua scelta (come [il modulo json] (https://docs.python.org/2/library/json.html)), quindi elaborare come desiderato. – tgpfeiffer

+0

Grazie, ha funzionato! Il passaggio chiave era l'utilizzo della funzione mappa su json.loads. Pubblicherò esattamente quello che ho fatto come risposta. Grazie mille per il vostro aiuto. – Brandt

1

È possibile caricare una directory di file in un singolo RDD utilizzando textFile e supporta anche i caratteri jolly. Questo non ti darebbe nomi di file, ma non sembra che tu li abbia bisogno.

È possibile utilizzare Spark SQL durante l'utilizzo di trasformazioni di base come carta, filtro ecc SchemaRDD è anche un RDD (in Python, così come Scala)

1

Per caricare elenco di JSON da un file come RDD:

def flat_map_json(x): return [each for each in json.loads(x[1])] 
rdd = sc.wholeTextFiles('example.json').flatMap(flat_map_json) 
Problemi correlati