Spark RDD non ha tale api.
Ecco una versione basata su una pull request for rdd.span che dovrebbe funzionare:
import scala.reflect.ClassTag
import org.apache.spark.rdd._
def split[T:ClassTag](rdd: RDD[T], p: T => Boolean): (RDD[T], RDD[T]) = {
val splits = rdd.mapPartitions { iter =>
val (left, right) = iter.partition(p)
val iterSeq = Seq(left, right)
iterSeq.iterator
}
val left = splits.mapPartitions { iter => iter.next().toIterator}
val right = splits.mapPartitions { iter =>
iter.next()
iter.next().toIterator
}
(left, right)
}
val rdd = sc.parallelize(0 to 10, 2)
val (first, second) = split[Int](rdd, _ % 2 == 0)
first.collect
// Array[Int] = Array(0, 2, 4, 6, 8, 10)
fonte
2015-04-09 20:37:55
questo approccio funziona con Spark Java API? –
No, Java non ha metodi di estensione. –
Non dovresti usare 'rdd.cache()' prima di eseguire i filtri? Questo dovrebbe sicuramente aumentare la velocità del tuo secondo filtro. –