Questo non era ovvio. Non vedo alcuna somma basata su righe delle colonne definite nell'API di scintilla Dataframes.
versione 2
questo può essere fatto in un modo abbastanza semplice:
newdf = df.withColumn('total', sum(df[col] for col in df.columns))
df.columns
è fornito da pyspark come una lista di stringhe che danno tutti i nomi delle colonne nel dataframe Spark. Per una somma diversa, puoi invece fornire qualsiasi altro elenco di nomi di colonne.
Non ho provato questo come la mia prima soluzione perché non ero sicuro di come si sarebbe comportato. Ma funziona.
Versione 1
Questo è troppo complicato, ma funziona bene.
Si può fare questo:
- uso
df.columns
per ottenere un elenco dei nomi delle colonne
- uso che elencano i nomi a fare una lista delle colonne
- passaggio che elencano a qualcosa che invocherà la funzione add sovraccarico della colonna in un fold-type functional manner
Con pitone di reduce, certa conoscenza di come l'overloading degli operatori opere e il codice pyspark per colonne here che diventa:
def column_add(a,b):
return a.__add__(b)
newdf = df.withColumn('total_col',
reduce(column_add, (df[col] for col in df.columns)))
Nota questo è un pitone ridurre, non una scintilla RDD ridurre, e il termine parentesi nel secondo parametro di ridurre richiede la parentesi perché è un generatore di espressione lista.
Testato, Funziona!
$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
... return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, (df[col] for col in df.columns))).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]
Questo è molto più facile con RDDs rispetto dataframes esempio se i dati sono una matrice che rappresenta una riga, allora puoi fare 'RDD.map (dati lambda: (dati, somma (dati)))'. Il motivo principale per cui questo è più difficile con un dataframe di scintilla è capire cosa è permesso come espressione di una colonna in 'withColumn'. Non sembra essere molto ben documentato. – Paul
Anche questo non funziona (PySpark 1.6.3): 'dftest.withColumn (" times ", sum ((dftest [c]> 2) .cast (" int ") per c in dftest.columns [1:])) 'e quindi, ' dftest.select ('a', 'b', 'c', 'd'). Rdd.map (lambda x: (x, sum (x))) .prendere (2) ' Non sembra funzionare –