2015-10-19 24 views
13

Sto usando Spark SQL (menziono che è in Spark nel caso in cui questo influisce sulla sintassi SQL - non sono abbastanza familiare per essere sicuro) e ho un tabella che sto cercando di ristrutturare, ma mi sto bloccando cercando di trasporre più colonne contemporaneamente.Esplodi (trasporre?) Più colonne nella tabella Spark SQL

Fondamentalmente ho dati che assomiglia:

userId someString  varA  varB 
    1  "example1" [0,2,5] [1,2,9] 
    2  "example2" [1,20,5] [9,null,6] 

e mi piacerebbe ad esplodere sia Vara varB contemporaneamente (la lunghezza sarà sempre consistente) - in modo che il risultato finale assomiglia a questo:

userId someString  varA  varB 
    1  "example1"  0   1 
    1  "example1"  2   2 
    1  "example1"  5   9 
    2  "example2"  1   9 
    2  "example2"  20  null 
    2  "example2"  5   6 

ma riesco solo per ottenere una singola istruzione esplodere (var) a lavorare in un unico comando, e se provo a catena loro (cioè creare una tabella temporanea dopo il primo comando di esplodere) allora io, ovviamente, ottengo un enorme numero di righe duplicate e non necessarie.

Grazie mille!

risposta

21

Quello che vuoi non è possibile senza una UDF personalizzata. In Scala si potrebbe fare qualcosa di simile:

val data = sc.parallelize(Seq(
    """{"userId": 1, "someString": "example1", 
     "varA": [0, 2, 5], "varB": [1, 2, 9]}""", 
    """{"userId": 2, "someString": "example2", 
     "varA": [1, 20, 5], "varB": [9, null, 6]}""" 
)) 

val df = sqlContext.read.json(data) 

df.printSchema 
// root 
// |-- someString: string (nullable = true) 
// |-- userId: long (nullable = true) 
// |-- varA: array (nullable = true) 
// | |-- element: long (containsNull = true) 
// |-- varB: array (nullable = true) 
// | |-- element: long (containsNull = true) 

Ora possiamo definire zip UDF:

import org.apache.spark.sql.functions.{udf, explode} 

val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys)) 

df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
    $"userId", $"someString", 
    $"vars._1".alias("varA"), $"vars._2".alias("varB")).show 

// +------+----------+----+----+ 
// |userId|someString|varA|varB| 
// +------+----------+----+----+ 
// |  1| example1| 0| 1| 
// |  1| example1| 2| 2| 
// |  1| example1| 5| 9| 
// |  2| example2| 1| 9| 
// |  2| example2| 20|null| 
// |  2| example2| 5| 6| 
// +------+----------+----+----+ 

Con SQL prime:

sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys)) 
df.registerTempTable("df") 

sqlContext.sql(
    """SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""") 
+0

questo può essere applicato su 3 colonne che sono di digita la sequenza? –

+0

@AmitKumar Sì, perché no? Dovrai regolare firma e corpo ma non è difficile. – zero323

+0

Mi chiedo se nella più recente API dei dataset si possa semplicemente usare map e zip gli array insieme senza creare l'UDF e se sarebbe più veloce/scalabile/ottimizzato dal motore di esecuzione di catalyst. Ci proverò quando alla console. – Davos