2015-03-19 23 views
7

Uso l'API DataFrame da Spark 1.3.Funzionamento su frame dati

Mi piacerebbe ottenere il giorno della settimana da una data in un DataFrame senza perdere tutti gli elementi di DataFrame.

Ho usato jodatime per scaricarlo su una semplice mappa prima di utilizzare l'API DataFrame.

In questo momento una soluzione che funziona:

sqlContext.createDataFrame(myDataFrame.map(l=>operationOnTheField(l)),myDataFrame.schema)) 

E 'possibile fare l'operazione senza tornare a una mappa su un RDD[Row] e quindi creare un dataframe con questo RDD?

risposta

1

Prova questa

Table.select(Table("Otherkey"),MyUdf(Table("ColNeeded")).as("UdfTransformed")) 

MyUdf è un'UDF definito da voi.

10

È possibile utilizzare una combinazione di chiamata select() su DataFrame e una funzione definita dall'utente (UDF) per trasformare la colonna in questione.

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.sql.functions._ 

Una classe case per impostare l'esempio DataFrame.

private case class Cust(id: Integer, name: String, 
     sales: Double, discount: Double, state: String) 

quindi impostare una SQLContext e creare il DataFrame come segue:

import sqlContext.implicits._ 

val custs = Seq(
    Cust(1, "Widget Co", 120000.00, 0.00, "AZ"), 
    Cust(2, "Acme Widgets", 410500.00, 500.00, "CA"), 
    Cust(3, "Widgetry", 410500.00, 200.00, "CA"), 
    Cust(4, "Widgets R Us", 410500.00, 0.0, "CA"), 
    Cust(5, "Ye Olde Widgete", 500.00, 0.0, "MA") 
) 
val customerDF = sc.parallelize(custs, 4).toDF() 

Registrare un semplice UDF che userete per trasformare la colonna "sconto".

val myFunc = udf {(x: Double) => x + 1} 

Ottieni le colonne, dopo aver applicato l'UDF alla colonna "sconto" e lasciando gli altri come erano.

val colNames = customerDF.columns 
val cols = colNames.map(cName => customerDF.col(cName)) 
val theColumn = customerDF("discount") 

Mi piacerebbe trovare un modo "migliore" per abbinare la colonna, ma i seguenti lavori. Usa as() per dare alla colonna un nuovo nome solo perché possiamo!

val mappedCols = cols.map(c => 
    if (c.toString() == theColumn.toString()) myFunc(c).as("transformed") else c) 

Usa select() per produrre il nuovo DataFrame

val newDF = customerDF.select(mappedCols:_*) 

Hai cambiato

id name   sales discount state 
1 Widget Co  120000.0 0.0  AZ 
2 Acme Widgets 410500.0 500.0 CA 
3 Widgetry  410500.0 200.0 CA 
4 Widgets R Us 410500.0 0.0  CA 
5 Ye Olde Widgete 500.0 0.0  MA 

in

id name   sales transformed state 
1 Widget Co  120000.0 1.0   AZ 
2 Acme Widgets 410500.0 501.0  CA 
3 Widgetry  410500.0 201.0  CA 
4 Widgets R Us 410500.0 1.0   CA 
5 Ye Olde Widgete 500.0 1.0   MA 

È possibile trovare l'esempio completo source code here . Puoi renderlo più semplice se non sei pignolo per una sostituzione esatta della colonna.

+2

Grazie per esempio. Dal mio punto di vista le UDF in spark data frame API non sono molto eleganti e intuitive = ( –

Problemi correlati