2015-08-12 34 views
14

Sto usando PySpark e ho un dataframe Spark con un gruppo di colonne numeriche. Voglio aggiungere una colonna che è la somma di tutte le altre colonne.Aggiungi colonna somma come nuova colonna in PySpark dataframe

Supponiamo che il mio dataframe avesse colonne "a", "b" e "c". So che posso fare questo:

df.withColumn('total_col', df.a + df.b + df.c) 

Il problema è che io non voglio di digitare ogni colonna singolarmente e aggiungerli, soprattutto se ho un sacco di colonne. Voglio essere in grado di farlo automaticamente o specificando un elenco di nomi di colonne che voglio aggiungere. c'è un altro modo per fare ciò?

+0

Questo è molto più facile con RDDs rispetto dataframes esempio se i dati sono una matrice che rappresenta una riga, allora puoi fare 'RDD.map (dati lambda: (dati, somma (dati)))'. Il motivo principale per cui questo è più difficile con un dataframe di scintilla è capire cosa è permesso come espressione di una colonna in 'withColumn'. Non sembra essere molto ben documentato. – Paul

+0

Anche questo non funziona (PySpark 1.6.3): 'dftest.withColumn (" times ", sum ((dftest [c]> 2) .cast (" int ") per c in dftest.columns [1:])) 'e quindi, ' dftest.select ('a', 'b', 'c', 'd'). Rdd.map (lambda x: (x, sum (x))) .prendere (2) ' Non sembra funzionare –

risposta

24

Questo non era ovvio. Non vedo alcuna somma basata su righe delle colonne definite nell'API di scintilla Dataframes.

versione 2

questo può essere fatto in un modo abbastanza semplice:

newdf = df.withColumn('total', sum(df[col] for col in df.columns)) 

df.columns è fornito da pyspark come una lista di stringhe che danno tutti i nomi delle colonne nel dataframe Spark. Per una somma diversa, puoi invece fornire qualsiasi altro elenco di nomi di colonne.

Non ho provato questo come la mia prima soluzione perché non ero sicuro di come si sarebbe comportato. Ma funziona.

Versione 1

Questo è troppo complicato, ma funziona bene.

Si può fare questo:

  1. uso df.columns per ottenere un elenco dei nomi delle colonne
  2. uso che elencano i nomi a fare una lista delle colonne
  3. passaggio che elencano a qualcosa che invocherà la funzione add sovraccarico della colonna in un fold-type functional manner

Con pitone di reduce, certa conoscenza di come l'overloading degli operatori opere e il codice pyspark per colonne here che diventa:

def column_add(a,b): 
    return a.__add__(b) 

newdf = df.withColumn('total_col', 
     reduce(column_add, (df[col] for col in df.columns))) 

Nota questo è un pitone ridurre, non una scintilla RDD ridurre, e il termine parentesi nel secondo parametro di ridurre richiede la parentesi perché è un generatore di espressione lista.

Testato, Funziona!

$ pyspark 
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache() 
>>> df 
DataFrame[a: bigint, b: bigint, c: bigint] 
>>> df.columns 
['a', 'b', 'c'] 
>>> def column_add(a,b): 
...  return a.__add__(b) 
... 
>>> df.withColumn('total', reduce(column_add, (df[col] for col in df.columns))).collect() 
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)] 
+0

@Salmonerd Grazie. A volte aiuta a ricordare che la classe spark dataframe è immutabile e quindi per apportare modifiche ai dati è necessario chiamare qualcosa che restituisca un nuovo dataframe. – Paul

+3

La versione 2 non funziona con Spark 1.5.0 e CDH-5.5.2. e Python versione 3.4. Viene generato un errore: "AttributeError: l'oggetto 'generator' non ha attributo '_get_object_id" – Hemant

+0

Entrambe le soluzioni sono belle e ordinate. Mi chiedo perché non hai usato le funzioni definite dall'utente per questo? – ThePrincess

Problemi correlati