2015-05-18 12 views
25

Sto cercando di risolvere l'annoso problema di aggiungere un numero di sequenza a un set di dati. Sto lavorando con DataFrame e sembra che non ci sia DataFrame equivalente a RDD.zipWithIndex. D'altra parte, le seguenti opere più o meno il modo in cui voglio che:DataFrame-zip zipWithIndex

val origDF = sqlContext.load(...)  

val seqDF= sqlContext.createDataFrame(
    origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)), 
    StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields) 
) 

Nella mia applicazione effettiva, origDF non verrà caricato direttamente da un file - che sta per essere creato da unendo altri 2-3 DataFrames e conterrà oltre 100 milioni di righe.

C'è un modo migliore per farlo? Cosa posso fare per ottimizzarlo?

risposta

3

Dal Spark 1.6 c'è una funzione chiamata monotonically_increasing_id()
genera un nuovo colonna con indice monotonico unico a 64 bit per ogni riga
Ma non è consequenziale, ogni partizione inizia un nuovo intervallo, quindi dobbiamo calcolare ogni offset di partizione bef minerale usandolo.
cercando di fornire una soluzione "senza RDD", ho finito con un po 'di raccogliere(), ma raccoglie solo offset, un valore per partizione, in modo da non causare OOM

def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = { 
    val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id()) 

    val partitionOffsets = dfWithPartitionId 
     .groupBy("partition_id") 
     .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id") 
     .orderBy("partition_id") 
     .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt") 
     .collect() 
     .map(_.getLong(0)) 
     .toArray 

    dfWithPartitionId 
     .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id"))) 
     .withColumn(indexName, col("partition_offset") + col("inc_id")) 
     .drop("partition_id", "partition_offset", "inc_id") 
}

Questa soluzione non reimposta le righe originali e non ripartiziona l'enorme dataframe originale, quindi è abbastanza veloce nel mondo reale: 200 GB di dati CSV (43 milioni di righe con 150 colonne) letti, indicizzati e impacchettati sul parquet in 2 minuti 240 core
Dopo aver testato la mia soluzione, ho eseguito Kirk Broadhurst's solution ed era 20 secondi più lento
Tu potrebbe volere o non voler utilizzare dfWithPartitionId.cache(), dipende dall'attività

+0

Nice! Buon lavoro! –

27

Quanto segue è stato pubblicato per conto di David Griffin (modificato fuori questione).

Il metodo dfZipWithIndex, tutto cantante e completamente danzante. È possibile impostare l'offset iniziale (che di default a 1), il nome della colonna di indice (il valore predefinito è "id"), e posizionare la colonna nella parte anteriore o posteriore:

import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.types.{LongType, StructField, StructType} 
import org.apache.spark.sql.Row 


def dfZipWithIndex(
    df: DataFrame, 
    offset: Int = 1, 
    colName: String = "id", 
    inFront: Boolean = true 
) : DataFrame = { 
    df.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map(ln => 
     Row.fromSeq(
     (if (inFront) Seq(ln._2 + offset) else Seq()) 
      ++ ln._1.toSeq ++ 
     (if (inFront) Seq() else Seq(ln._2 + offset)) 
    ) 
    ), 
    StructType(
     (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]()) 
     ++ df.schema.fields ++ 
     (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false))) 
    ) 
) 
} 
+0

@eliasah - Ho trovato un modo di espressione 'Window' per farlo. È molto più lento, tuttavia, ma immagina che potresti voler dare un'occhiata. Vedi la risposta qui sotto. –

+0

È fantastico. Qualche riferimento a una versione di PySpark? Grazie per la condivisione. – Tagar

5

A partire dal Spark 1.5, Window espressioni sono stati aggiunti a Spark. Invece di dover convertire lo DataFrame in un RDD, ora è possibile utilizzare org.apache.spark.sql.expressions.row_number. Si noti che ho trovato le prestazioni per il precedente dfZipWithIndex per essere significativamente più veloce dell'algoritmo di seguito. Ma mi distacco perché:

  1. Qualcun altro sta per essere tentati di provare questo
  2. Forse qualcuno in grado di ottimizzare le espressioni di seguito

Ad ogni modo, ecco cosa funziona per me:

import org.apache.spark.sql.expressions._ 

df.withColumn("row_num", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1)))) 

Nota che uso lit(1) sia per il partizionamento e l'ordinamento - questo rende tutto essere nella stessa partizione, e sembra conservare l'originale ordinamento del DataFrame, ma suppongo che sia ciò che rallenta verso il basso.

ho testato su un 4 colonne DataFrame con 7.000.000 righe e la differenza di velocità è significativo tra questo e il sopra dfZipWithIndex (come ho detto, le funzioni RDD è molto, molto più veloce). Versione

+2

Non causerebbe l'errore OOM se il set di dati non si adatta alla memoria di un singolo operatore? –

+1

Non ne ho idea - so solo che è molto, molto più lento del 'zipWithIndex' basato su' RDD', e questo è stato più che sufficiente per smettere di pensarci. Ho postato quanto sopra in modo che le altre persone non siano tentate di andare troppo avanti su questa strada; l'originale 'dfZipWithIndex' sembra ancora essere l'approccio migliore. –

+0

Grazie per aver condiviso questo, ho pensato che il modo di non convertire DF in RDD sarebbe inizialmente più veloce, e non andrà troppo lontano in questo modo ora. –

0

PySpark:

from pyspark.sql.types import LongType, StructField, StructType 

def dfZipWithIndex (df, offset=1, colName="rowId"): 
    ''' 
     Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
     and preserves a schema 

     :param df: source dataframe 
     :param offset: adjustment to zipWithIndex()'s index 
     :param colName: name of the index column 
    ''' 

    new_schema = StructType(
        [StructField(colName,LongType(),True)]  # new added field in front 
        + df.schema.fields       # previous schema 
       ) 

    zipped_rdd = df.rdd.zipWithIndex() 

    new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row))) 

    return spark.createDataFrame(new_rdd, new_schema) 

creato anche un jira per aggiungere questa funzionalità in Spark nativamente: https://issues.apache.org/jira/browse/SPARK-23074