2016-05-27 13 views
7

Ho un semplice dataframe come questo:colonna pivot stringa su Pyspark dataframe

rdd = sc.parallelize(
    [ 
     (0, "A", 223,"201603", "PORT"), 
     (0, "A", 22,"201602", "PORT"), 
     (0, "A", 422,"201601", "DOCK"), 
     (1,"B", 3213,"201602", "DOCK"), 
     (1,"B", 3213,"201601", "PORT"), 
     (2,"C", 2321,"201601", "DOCK") 
    ] 
) 
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"]) 

df_data.show() 
+---+----+----+------+----+ 
| id|type|cost| date|ship| 
+---+----+----+------+----+ 
| 0| A| 223|201603|PORT| 
| 0| A| 22|201602|PORT| 
| 0| A| 422|201601|DOCK| 
| 1| B|3213|201602|DOCK| 
| 1| B|3213|201601|PORT| 
| 2| C|2321|201601|DOCK| 
+---+----+----+------+----+ 

e ho bisogno di pivot per data:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("cost").show() 

+---+----+------+------+------+ 
| id|type|201601|201602|201603| 
+---+----+------+------+------+ 
| 2| C|2321.0| null| null| 
| 0| A| 422.0| 22.0| 223.0| 
| 1| B|3213.0|3213.0| null| 
+---+----+------+------+------+ 

Tutto funziona come previsto. Ma ora ho bisogno di girare e ottenere una colonna non numerico:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("ship").show() 

e, naturalmente, vorrei avere un'eccezione:

AnalysisException: u'"ship" is not a numeric column. Aggregation function can only be applied on a numeric column.;' 

Vorrei generare qualcosa sulla linea di

+---+----+------+------+------+ 
| id|type|201601|201602|201603| 
+---+----+------+------+------+ 
| 2| C|DOCK | null| null| 
| 0| A| DOCK | PORT| DOCK| 
| 1| B|DOCK |PORT | null| 
+---+----+------+------+------+ 

E 'possibile con pivot?

risposta

10

Supponendo che (id |type | date) combinazioni sono unici e il vostro unico obiettivo è orientabile e non l'aggregazione è possibile utilizzare first (o qualsiasi altra funzione non limitato ai valori numerici):

from pyspark.sql.functions import first 

(df_data 
    .groupby(df_data.id, df_data.type) 
    .pivot("date") 
    .agg(first("ship")) 
    .show()) 

## +---+----+------+------+------+ 
## | id|type|201601|201602|201603| 
## +---+----+------+------+------+ 
## | 2| C| DOCK| null| null| 
## | 0| A| DOCK| PORT| PORT| 
## | 1| B| PORT| DOCK| null| 
## +---+----+------+------+------+ 

Se questi presupposti non è corretto' Dovrò pre-aggregare i tuoi dati. Ad esempio per il più comune valore ship:

from pyspark.sql.functions import max, struct 

(df_data 
    .groupby("id", "type", "date", "ship") 
    .count() 
    .groupby("id", "type") 
    .pivot("date") 
    .agg(max(struct("count", "ship"))) 
    .show()) 

## +---+----+--------+--------+--------+ 
## | id|type| 201601| 201602| 201603| 
## +---+----+--------+--------+--------+ 
## | 2| C|[1,DOCK]| null| null| 
## | 0| A|[1,DOCK]|[1,PORT]|[1,PORT]| 
## | 1| B|[1,PORT]|[1,DOCK]| null| 
## +---+----+--------+--------+--------+ 
Problemi correlati