2015-07-24 31 views
12

Attualmente sto cercando di estrarre un database da MongoDB e utilizzare Spark per importare in ElasticSearch con geo_points.Come aggiungere una nuova colonna Struct a un DataFrame

Il database Mongo ha valori di latitudine e longitudine, ma ElasticSearch richiede che vengano inseriti nel tipo geo_point.

C'è un modo in Spark per copiare i lat e lon colonne a una nuova colonna che è un array o struct?

Qualsiasi aiuto è apprezzato!

risposta

33

presumo si inizia con una sorta di schema piatto come questo:

root 
|-- lat: double (nullable = false) 
|-- long: double (nullable = false) 
|-- key: string (nullable = false) 

primo luogo permette di creare dati di esempio:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions.{col, udf} 
import org.apache.spark.sql.types._ 

val rdd = sc.parallelize(
    Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil) 

val schema = StructType(
    StructField("lat", DoubleType, false) :: 
    StructField("long", DoubleType, false) :: 
    StructField("key", StringType, false) ::Nil) 

val df = sqlContext.createDataFrame(rdd, schema) 

Un modo semplice è quello di utilizzare una classe UDF e caso:

case class Location(lat: Double, long: Double) 
val makeLocation = udf((lat: Double, long: Double) => Location(lat, long)) 

val dfRes = df. 
    withColumn("location", makeLocation(col("lat"), col("long"))). 
    drop("lat"). 
    drop("long") 

dfRes.printSchema 

e otteniamo

root 
|-- key: string (nullable = false) 
|-- location: struct (nullable = true) 
| |-- lat: double (nullable = false) 
| |-- long: double (nullable = false) 

Un modo più difficile è quello di trasformare i dati e applicare lo schema seguito:

val rddRes = df. 
    map{case Row(lat, long, key) => Row(key, Row(lat, long))} 

val schemaRes = StructType(
    StructField("key", StringType, false) :: 
    StructField("location", StructType(
     StructField("lat", DoubleType, false) :: 
     StructField("long", DoubleType, false) :: Nil 
    ), true) :: Nil 
) 

sqlContext.createDataFrame(rddRes, schemaRes).show 

e otteniamo un output previsto

+------+-------------+ 
| key|  location| 
+------+-------------+ 
|Warsaw|[52.23,21.01]| 
| Corte| [42.3,9.15]| 
+------+-------------+ 

Creazione dello schema annidato da zero può essere noioso quindi se si può Consiglierei il primo approccio. Esso può essere facilmente esteso se avete bisogno di struttura più sofisticata:

case class Pin(location: Location) 
val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long)) 

df. 
    withColumn("pin", makePin(col("lat"), col("long"))). 
    drop("lat"). 
    drop("long"). 
    printSchema 

e otteniamo previsto in uscita:

root 
|-- key: string (nullable = false) 
|-- pin: struct (nullable = true) 
| |-- location: struct (nullable = true) 
| | |-- lat: double (nullable = false) 
| | |-- long: double (nullable = false) 

Purtroppo si ha alcun controllo su nullable campo quindi se è importante per il vostro progetto Avrete devi specificare lo schema.

Infine è possibile utilizzare la funzione struct introdotta in 1.4:

import org.apache.spark.sql.functions.struct 

df.select($"key", struct($"lat", $"long").alias("location")) 
+0

Grazie @ zero323 per la risposta esauriente! Questo aiuta un mucchio. Sapresti come potrei fare questa mappatura in modo ricorsivo per i tipi annidati? Questi dati sono più brutti di quanto speravo. –

+0

Non vedo alcun motivo per cui non potresti. – zero323

+0

Ciao @ zero323 - Sai se è comunque possibile utilizzare il metodo UDF per creare una struttura se sono presenti più di 10 colonne nella nuova struttura? Le UDF sembrano avere una limitazione su 10 variabili di input. –

1

Prova questo:

import org.apache.spark.sql.functions._ 

df.registerTempTable("dt") 

dfres = sql("select struct(lat,lon) as colName from dt") 
Problemi correlati