20

mi chiedo come posso raggiungere i seguenti obiettivi a Spark (Pyspark)Spark aggiungere nuova colonna a dataframe con valore dalla riga precedente

dataframe iniziale:

+--+---+ 
|id|num| 
+--+---+ 
|4 |9.0| 
+--+---+ 
|3 |7.0| 
+--+---+ 
|2 |3.0| 
+--+---+ 
|1 |5.0| 
+--+---+ 

risultante dataframe:

+--+---+-------+ 
|id|num|new_Col| 
+--+---+-------+ 
|4 |9.0| 7.0 | 
+--+---+-------+ 
|3 |7.0| 3.0 | 
+--+---+-------+ 
|2 |3.0| 5.0 | 
+--+---+-------+ 

In genere riesco ad "aggiungere" nuove colonne a un dataframe utilizzando qualcosa come: df.withColumn("new_Col", df.num * 10)

Tuttavia non ho idea di come sia possibile ottenere questo "spostamento di righe" per la nuova colonna, in modo che la nuova colonna abbia il valore di un campo della riga precedente (come mostrato nell'esempio). Inoltre, non ho trovato nulla nella documentazione dell'API su come accedere a una determinata riga in un DF per indice.

Qualsiasi aiuto sarebbe apprezzato.

risposta

24

È possibile utilizzare lag funzione finestra come segue

from pyspark.sql.functions import lag, col 
from pyspark.sql.window import Window 

df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"]) 
w = Window().partitionBy().orderBy(col("id")) 
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show() 

## +---+---+-------+ 
## | id|num|new_col| 
## +---+---+-------| 
## | 2|3.0| 5.0| 
## | 3|7.0| 3.0| 
## | 4|9.0| 7.0| 
## +---+---+-------+ 

ma alcune questioni importanti:

  1. se avete bisogno di un'operazione globale (non partizionato da alcune altre colonne/colonne) è estremamente inefficiente.
  2. è necessario un modo naturale per ordinare i dati.

Mentre il secondo problema non è quasi mai un problema, il primo può essere un affare. Se questo è il caso, devi semplicemente convertire il tuo DataFrame in RDD e calcolare lag manualmente. Si veda ad esempio:

Altri link utili:

Problemi correlati