2015-11-11 13 views
7

Come leggere parquet partizionato con condizioni come dataframe,Lettura dataframe da file parquet partizionata

questo funziona bene,

val dataframe = sqlContext.read.parquet("file:///home/msoproj/dev_data/dev_output/aln/partitions/data=jDD/year=2015/month=10/day=25/*") 

partizione è lì per day=1 to day=30 è possibile leggere qualcosa di simile (day = 5 to 6) o day=5,day=6,

val dataframe = sqlContext.read.parquet("file:///home/msoproj/dev_data/dev_output/aln/partitions/data=jDD/year=2015/month=10/day=??/*") 

Se metto * mi dà tutti i 30 giorni di dati ed è troppo grande.

risposta

28

sqlContext.read.parquet può assumere più percorsi come input. Se si desidera solo day=5 e day=6, si può semplicemente aggiungere due percorsi come:

val dataframe = sqlContext 
     .read.parquet("file:///your/path/data=jDD/year=2015/month=10/day=5/", 
        "file:///your/path/data=jDD/year=2015/month=10/day=6/") 

Se si dispone di cartelle in day=X, come dicono country=XX, country verrà automaticamente aggiunto come una colonna nel dataframe.

MODIFICA: A partire da Spark 1.6 è necessario fornire un'opzione "base" per consentire a Spark di generare automaticamente le colonne. Nel Spark 1.6.x quanto sopra avrebbe dovuto essere riscritto come questo per creare un dataframe con le colonne "dati", "anno", "mese" e "giorno":

val dataframe = sqlContext 
    .read 
    .option("basePath", "file:///your/path/") 
    .parquet("file:///your/path/data=jDD/year=2015/month=10/day=5/", 
        "file:///your/path/data=jDD/year=2015/month=10/day=6/") 
+0

Innanzitutto grazie per la risposta, che stavo cercando il modo più semplice. Nel caso in cui circa 20 giorni come sottoinsieme in questo modo sarà un po 'difficile. Vorrei filtrare spesso per verificare l'accuratezza dei dati. – WoodChopper

+0

Allora perché non fare semplicemente 'val dataframe = sqlContext.read.parquet (" file: /// tuo/percorso/dati = jDD/anno = 2015/mese = 10/")? 'day' viene aggiunto come colonna nel dataframe, che è possibile filtrare. –

+0

In realtà, i dati sono davvero enormi. I dati vanno dal 2007 al 2015. Su una media di 5 miliardi di righe di log grezzi vengono elaborati e archiviati. Mi verrebbe richiesto un particolare rapporto dati su richiesta – WoodChopper

4

è necessario per fornire l'opzione mergeSchema = true. come indicato di seguito (questo è da 1.6.0):

val dataframe = sqlContext.read.option("mergeSchema", "true").parquet("file:///your/path/data=jDD") 

Questo leggerà tutti i file in parquet in dataframe e crea anche le colonne anno, mese e giorno nei dati dataframe.

Rif: https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#schema-merging

+0

La fusione degli schemi è necessaria solo se gli schemi sono diversi, se sono uguali, non è necessario. – mightymephisto