presumo si inizia con una sorta di schema piatto come questo:
root
|-- lat: double (nullable = false)
|-- long: double (nullable = false)
|-- key: string (nullable = false)
primo luogo permette di creare dati di esempio:
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types._
val rdd = sc.parallelize(
Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)
val schema = StructType(
StructField("lat", DoubleType, false) ::
StructField("long", DoubleType, false) ::
StructField("key", StringType, false) ::Nil)
val df = sqlContext.createDataFrame(rdd, schema)
Un modo semplice è quello di utilizzare una classe UDF e caso:
case class Location(lat: Double, long: Double)
val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))
val dfRes = df.
withColumn("location", makeLocation(col("lat"), col("long"))).
drop("lat").
drop("long")
dfRes.printSchema
e otteniamo
root
|-- key: string (nullable = false)
|-- location: struct (nullable = true)
| |-- lat: double (nullable = false)
| |-- long: double (nullable = false)
Un modo più difficile è quello di trasformare i dati e applicare lo schema seguito:
val rddRes = df.
map{case Row(lat, long, key) => Row(key, Row(lat, long))}
val schemaRes = StructType(
StructField("key", StringType, false) ::
StructField("location", StructType(
StructField("lat", DoubleType, false) ::
StructField("long", DoubleType, false) :: Nil
), true) :: Nil
)
sqlContext.createDataFrame(rddRes, schemaRes).show
e otteniamo un output previsto
+------+-------------+
| key| location|
+------+-------------+
|Warsaw|[52.23,21.01]|
| Corte| [42.3,9.15]|
+------+-------------+
Creazione dello schema annidato da zero può essere noioso quindi se si può Consiglierei il primo approccio. Esso può essere facilmente esteso se avete bisogno di struttura più sofisticata:
case class Pin(location: Location)
val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))
df.
withColumn("pin", makePin(col("lat"), col("long"))).
drop("lat").
drop("long").
printSchema
e otteniamo previsto in uscita:
root
|-- key: string (nullable = false)
|-- pin: struct (nullable = true)
| |-- location: struct (nullable = true)
| | |-- lat: double (nullable = false)
| | |-- long: double (nullable = false)
Purtroppo si ha alcun controllo su nullable
campo quindi se è importante per il vostro progetto Avrete devi specificare lo schema.
Infine è possibile utilizzare la funzione struct
introdotta in 1.4:
import org.apache.spark.sql.functions.struct
df.select($"key", struct($"lat", $"long").alias("location"))
Grazie @ zero323 per la risposta esauriente! Questo aiuta un mucchio. Sapresti come potrei fare questa mappatura in modo ricorsivo per i tipi annidati? Questi dati sono più brutti di quanto speravo. –
Non vedo alcun motivo per cui non potresti. – zero323
Ciao @ zero323 - Sai se è comunque possibile utilizzare il metodo UDF per creare una struttura se sono presenti più di 10 colonne nella nuova struttura? Le UDF sembrano avere una limitazione su 10 variabili di input. –