7

a Spark DataFrame contiene una colonna di tipo Array [Double]. Genera un'eccezione ClassCastException quando provo a recuperarla in una funzione map(). Il seguente codice di scala genera un'eccezione.Colonna Access Array in Spark

case class Dummy(x:Array[Double]) 
val df = sqlContext.createDataFrame(Seq(Dummy(Array(1,2,3)))) 
val s = df.map(r => { 
    val arr:Array[Double] = r.getAs[Array[Double]]("x") 
    arr.sum 
}) 
s.foreach(println) 

L'eccezione è

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [D 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:24) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:23) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890) 
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Cam qualcuno mi spiega perché non funziona? cosa dovrei fare invece? Sto usando Spark 1.5.1 e 2.10.6 scala

Grazie

risposta

19

ArrayType è rappresentata in un come scala.collection.mutable.WrappedArray. È possibile estrarre utilizzando ad esempio

val arr: Seq[Double] = r.getAs[Seq[Double]]("x") 

o

val i: Int = ??? 
val arr = r.getSeq[Double](i) 

o anche:

import scala.collection.mutable.WrappedArray 

val arr: WrappedArray[Double] = r.getAs[WrappedArray[Double]]("x") 

Se DataFrame è relativamente sottile, allora il pattern matching potrebbe essere un approccio migliore:

import org.apache.spark.sql.Row 

df.rdd.map{case Row(x: Seq[Double]) => (x.toArray, x.sum)} 

anche se è necessario tenere presente che il tipo di sequenza è deselezionato.

In Spark> = 1.6 è anche possibile utilizzare Dataset come segue:

df.select("x").as[Seq[Double]].rdd 
Problemi correlati