2016-02-16 19 views
8

Sto usando PySpark per fare un classico lavoro ETL (carica set di dati, elaborarlo, salvarlo) e voglio salvare il mio Dataframe come file/directory partizionata da una colonna "virtuale" ; quello che intendo per "virtuale" è che ho una colonna Timestamp che è una stringa contenente una data codificata ISO 8601 e vorrei partizionare per Anno/Mese/Giorno; ma in realtà non ho una colonna Year, Month o Day in DataFrame; Ho questo Timestamp dal quale posso ricavare queste colonne, ma non voglio che i miei elementi resultat abbiano una di queste colonne serializzate.Spark: salva DataFrame partizionato dalla colonna "virtuale"

struttura del file risultante dal salvataggio del dataframe su disco dovrebbe essere simile:

/ 
    year=2016/ 
     month=01/ 
      day=01/ 
       part-****.gz 

C'è un modo per fare quello che voglio con Spark/Pyspark?

risposta

15

Le colonne utilizzate per il partizionamento non sono incluse nei dati serializzati. Per esempio, se si crea DataFrame in questo modo:

df = sc.parallelize([ 
    (1, "foo", 2.0, "2016-02-16"), 
    (2, "bar", 3.0, "2016-02-16") 
]).toDF(["id", "x", "y", "date"]) 

e scrivere come segue:

import tempfile 
from pyspark.sql.functions import col, dayofmonth, month, year 
outdir = tempfile.mktemp() 

dt = col("date").cast("date") 
fname = [(year, "year"), (month, "month"), (dayofmonth, "day")] 
exprs = [col("*")] + [f(dt).alias(name) for f, name in fname] 

(df 
    .select(*exprs) 
    .write 
    .partitionBy(*(name for _, name in fname)) 
    .format("json") 
    .save(outdir)) 

i singoli file non conterranno colonne di partizione: dati

import os 

(sqlContext.read 
    .json(os.path.join(outdir, "year=2016/month=2/day=16/")) 
    .printSchema()) 

## root 
## |-- date: string (nullable = true) 
## |-- id: long (nullable = true) 
## |-- x: string (nullable = true) 
## |-- y: double (nullable = true) 

partizionamento viene memorizzato solo in una struttura di directory e non duplicati in file serializzati. Verrà aggiunto solo quando l'albero di directory completo o parziale è stato letto:

sqlContext.read.json(outdir).printSchema() 

## root 
## |-- date: string (nullable = true) 
## |-- id: long (nullable = true) 
## |-- x: string (nullable = true) 
## |-- y: double (nullable = true) 
## |-- year: integer (nullable = true) 
## |-- month: integer (nullable = true) 
## |-- day: integer (nullable = true) 

sqlContext.read.json(os.path.join(outdir, "year=2016/month=2/")).printSchema() 

## root 
## |-- date: string (nullable = true) 
## |-- id: long (nullable = true) 
## |-- x: string (nullable = true) 
## |-- y: double (nullable = true) 
## |-- day: integer (nullable = true) 
+0

Sono nuovo in Python. C'è un modo per farlo senza avere l'anno =, mese = e giorno = nel percorso? Capisco la maggior parte di questo – deanw

Problemi correlati