2015-09-04 21 views
9

Descrizione

Dato un dataframe dfSpark e SparkSQL: come imitare la funzione finestra?

id |  date 
--------------- 
1 | 2015-09-01 
2 | 2015-09-01 
1 | 2015-09-03 
1 | 2015-09-04 
2 | 2015-09-04 

Voglio creare un contatore in esecuzione o di un indice,

  • raggruppati per lo stesso ID e
  • ordinato per data in quel gruppo,

quindi

id |  date | counter 
-------------------------- 
1 | 2015-09-01 |  1 
1 | 2015-09-03 |  2 
1 | 2015-09-04 |  3 
2 | 2015-09-01 |  1 
2 | 2015-09-04 |  2 

Questo è qualcosa che posso ottenere con la funzione finestra, ad es.

val w = Window.partitionBy("id").orderBy("date") 
val resultDF = df.select(df("id"), rowNumber().over(w)) 

Purtroppo, Spark 1.4.1 non supporta funzioni finestra per dataframes regolari:

org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext; 

Domande

  • Come posso raggiungere il calcolo di cui sopra su Spark corrente 1.4.1 senza usando le funzioni della finestra?
  • Quando in Spark saranno supportate le funzioni della finestra per i dataframes regolari?

Grazie!

+0

È necessario utilizzare i dataframes e SQL oppure è possibile utilizzare gli RDD? Questo è abbastanza semplice con il metodo groupBy. –

+0

@KirkBroadhurst: anche gli RDD vanno bene. Potresti per favore abbozzare la tua idea con un piccolo estratto di codice? A partire da SparkSQL attualmente non vedo in che modo come si fa: hai un'idea? –

risposta

6

È possibile farlo con RDD. Personalmente trovo che l'API per gli RDD abbia molto più senso - non sempre voglio che i miei dati siano "piatti" come un dataframe.

val df = sqlContext.sql("select 1, '2015-09-01'" 
    ).unionAll(sqlContext.sql("select 2, '2015-09-01'") 
    ).unionAll(sqlContext.sql("select 1, '2015-09-03'") 
    ).unionAll(sqlContext.sql("select 1, '2015-09-04'") 
    ).unionAll(sqlContext.sql("select 2, '2015-09-04'")) 

// dataframe as an RDD (of Row objects) 
df.rdd 
    // grouping by the first column of the row 
    .groupBy(r => r(0)) 
    // map each group - an Iterable[Row] - to a list and sort by the second column 
    .map(g => g._2.toList.sortBy(row => row(1).toString))  
    .collect() 

È possibile che questo dà un risultato simile al seguente:

Array[List[org.apache.spark.sql.Row]] = 
Array(
    List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]), 
    List([2,2015-09-01], [2,2015-09-04])) 

Se si desidera che la posizione all'interno del 'gruppo' così, è possibile utilizzare zipWithIndex.

df.rdd.groupBy(r => r(0)).map(g => 
    g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect() 

Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
    List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)), 
    List(([2,2015-09-01],0), ([2,2015-09-04],1))) 

Si potrebbe appiattire questo nuovo ad un semplice elenco/Array of oggetti utilizzando FlatMap, ma se è necessario eseguire qualsiasi cosa sul 'gruppo' che non sarà una grande idea.

Lo svantaggio di utilizzare RDD come questo è che è noioso convertire da DataFrame in RDD e viceversa.

+0

Mille grazie !!! Questa era la soluzione, stavo cercando. Hmm, non ero abbastanza "coraggioso", per eseguire le normali operazioni di lista 'Scala, una volta che il' groupBy' era finito ... –

+0

cosa succederà quando la mia lista "g._2.toList.sortBy" ha milioni di elementi, non posso raccoglierli – halil

7

È possibile utilizzare HiveContext per il locale DataFrames e, a meno che non si abbia una buona ragione per non farlo, probabilmente è comunque una buona idea. È un valore predefinito SQLContext disponibile in spark-shell e pyspark shell (come per ora sparkR sembra utilizzare SQLContext semplice e il suo parser è raccomandato da Spark SQL and DataFrame Guide.

import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.rowNumber 

object HiveContextTest { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("Hive Context") 
    val sc = new SparkContext(conf) 
    val sqlContext = new HiveContext(sc) 
    import sqlContext.implicits._ 

    val df = sc.parallelize(
     ("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil 
    ).toDF("k", "v") 

    val w = Window.partitionBy($"k").orderBy($"v") 
    df.select($"k", $"v", rowNumber.over(w).alias("rn")).show 
    } 
} 
3

Sono assolutamente d'accordo sul fatto che le funzioni Window per DataFrames sono la strada da percorrere se si dispone della versione di Spark (> =) 1.5. Ma se sei davvero bloccato con una versione precedente (ad esempio 1.4.1), ecco un modo hacky per risolvere questo

val df = sc.parallelize((1, "2015-09-01") :: (2, "2015-09-01") :: (1, "2015-09-03") :: (1, "2015-09-04") :: (1, "2015-09-04") :: Nil) 
      .toDF("id", "date") 

val dfDuplicate = df.selecExpr("id as idDup", "date as dateDup") 
val dfWithCounter = df.join(dfDuplicate,$"id"===$"idDup") 
         .where($"date"<=$"dateDup") 
         .groupBy($"id", $"date") 
         .agg($"id", $"date", count($"idDup").as("counter")) 
         .select($"id",$"date",$"counter") 

Ora, se si fa dfWithCounter.show

otterrete:

+---+----------+-------+               
| id|  date|counter| 
+---+----------+-------+ 
| 1|2015-09-01|  1| 
| 1|2015-09-04|  3| 
| 1|2015-09-03|  2| 
| 2|2015-09-01|  1| 
| 2|2015-09-04|  2| 
+---+----------+-------+ 

Nota che date non è ordinato, ma il counter è corretto . Inoltre, è possibile modificare l'ordinamento di counter modificando <= in >= nell'istruzione where.

Problemi correlati