2016-03-22 15 views
10

ripartiziono un dataframe come segue:Prevent DataFrame.partitionBy() di rimuovere le colonne partizionati da schema

df.write.partitionBy("type", "category").parquet(config.outpath) 

Il codice dà i risultati attesi (dati partizionati cioè per tipo & categoria). Tuttavia, le colonne "tipo" e "categoria" vengono rimosse dai dati/dallo schema. C'è un modo per prevenire questo comportamento?

+0

non è che un punto? Tutti i dati richiesti sono ancora codificati nella struttura delle directory, quindi non c'è perdita di dati. Se vuoi un valore-per-file potresti provare 'df.repartition (" type "," category "). Write (...)' ma non otterrai una buona struttura. – zero323

+0

@ zero323: sì, sono d'accordo che non ci sono perdite di dati. Tuttavia, il recupero delle colonne utilizzate per il partizionamento non è banale per alcuni casi d'uso. Ad esempio, se voglio caricare i dati nel maiale, come posso recuperare il tipo e le colonne della categoria? – Michael

+0

Non ho usato Pig per un po '. 'ParquetLoader' non capisce la struttura fuori dalla scatola? – zero323

risposta

8

Posso pensare a una soluzione alternativa, che è piuttosto zoppa, ma funziona.

import spark.implicits._ 

val duplicated = df.withColumn("_type", $"type").withColumn("_category", $"category") 
duplicated.write.partitionBy("_type", "_category").parquet(config.outpath) 

che sto rispondere a questa domanda, nella speranza che qualcuno avrebbe una risposta migliore o una spiegazione di quello che ho (se OP ha trovato una soluzione migliore), anche se, dal momento che ho la stessa domanda.

+1

A dire il vero, non mi sembra così zoppo. Sembra l'approccio migliore dato il comportamento di 'partitionBy()'. – Michael

1

In generale, la risposta di Ivan è un bel rancore. MA ...

Se si sta leggendo e scrivendo rigorosamente nella scintilla, è possibile utilizzare l'opzione basePath durante la lettura dei dati.

https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#partition-discovery

Per percorso di passaggio/a/tavolo per sia SparkSession.read.parquet o SparkSession.read.load, Spark SQL estrarrà automaticamente le informazioni di partizionamento dai percorsi.

Esempio:

 val dataset = spark 
     .read 
     .format("parquet") 
     .option("basePath", hdfsInputBasePath) 
     .load(hdfsInputPath) 
Problemi correlati