Per poter utilizzare la funzione finestra è necessario innanzitutto creare una finestra. La definizione è praticamente la stessa di SQL normale, significa che è possibile definire l'ordine, la partizione o entrambi. In primo luogo permette di creare un po 'di dati fittizi:
import numpy as np
np.random.seed(1)
keys = ["foo"] * 10 + ["bar"] * 10
values = np.hstack([np.random.normal(0, 1, 10), np.random.normal(10, 1, 100)])
df = sqlContext.createDataFrame([
{"k": k, "v": round(float(v), 3)} for k, v in zip(keys, values)])
Assicurarsi che si sta utilizzando HiveContext
(Spark < 2.0):
from pyspark.sql import HiveContext
assert isinstance(sqlContext, HiveContext)
Creare una finestra:
from pyspark.sql.window import Window
w = Window.partitionBy(df.k).orderBy(df.v)
che è equivalente a
(PARTITION BY k ORDER BY v)
in SQL.
Come regola generale, le definizioni della finestra devono sempre contenere la clausola PARTITION BY
altrimenti Spark trasferirà tutti i dati in una singola partizione. ORDER BY
è richiesto per alcune funzioni, mentre in diversi casi (tipicamente aggregati) può essere facoltativo.
Esistono anche due opzioni facoltative che possono essere utilizzate per definire l'intervallo finestra - ROWS BETWEEN
e RANGE BETWEEN
. Questi non saranno utili per noi in questo particolare scenario.
Infine possiamo usarlo per una query:
from pyspark.sql.functions import percentRank, ntile
df.select(
"k", "v",
percentRank().over(w).alias("percent_rank"),
ntile(3).over(w).alias("ntile3")
)
Nota che ntile
non è legato in alcun modo ai quantili.