2015-04-09 17 views
13

devo dividere un RDD in 2 parti:Apache filtro Spark RDD in due RDDs

1 parte che soddisfa una condizione; un'altra parte che non lo fa. Posso fare filter due volte sul RDD originale ma sembra inefficiente. C'è un modo per fare ciò che cerco? Non riesco a trovare nulla nell'API o nella letteratura.

risposta

17

Spark non supporta questo predefinito. Il filtro sugli stessi dati due volte non è poi così male se lo si memorizza prima e il filtro stesso è veloce.

Se è davvero solo due tipi diversi, è possibile utilizzare un metodo di supporto:

implicit class RDDOps[T](rdd: RDD[T]) { 
    def partitionBy(f: T => Boolean): (RDD[T], RDD[T]) = { 
    val passes = rdd.filter(f) 
    val fails = rdd.filter(e => !f(e)) // Spark doesn't have filterNot 
    (passes, fails) 
    } 
} 

val (matches, matchesNot) = sc.parallelize(1 to 100).cache().partitionBy(_ % 2 == 0) 

Ma non appena si dispone di più tipi di dati, basta assegnare il filtrato in un nuovo val.

+0

questo approccio funziona con Spark Java API? –

+0

No, Java non ha metodi di estensione. –

+1

Non dovresti usare 'rdd.cache()' prima di eseguire i filtri? Questo dovrebbe sicuramente aumentare la velocità del tuo secondo filtro. –

0

Se si sta bene con uno T anziché uno RDD[T], quindi è possibile do this. In caso contrario, si potrebbe forse fare qualcosa di simile:

val data = sc.parallelize(1 to 100) 
val splitData = data.mapPartitions{iter => { 
    val splitList = (iter.toList).partition(_%2 == 0) 
    Tuple1(splitList).productIterator 
    } 
}.map(_.asInstanceOf[Tuple2[List[Int],List[Int]]]) 

E, allora si avrà probabilmente bisogno di ridurre questo fino a fondere le liste quando si va a eseguire un'azione

+0

Mi piacerebbe scoprire perché questo era giù votato in quanto è l'unica risposta che le risposte in realtà i PO interrogano –

+0

(nota: non ho vote down) il metodo è interessante ma non risponde alla domanda. L'OP ha richiesto una 'partition (RDD [A], A => Boolean): (RDD [A], RDD [A])', la vostra sarebbe 'partition (RDD [A], A => Boolean): RDD [Elenco [A], Elenco [A]] ' –

3

Spark RDD non ha tale api.

Ecco una versione basata su una pull request for rdd.span che dovrebbe funzionare:

import scala.reflect.ClassTag 
import org.apache.spark.rdd._ 

def split[T:ClassTag](rdd: RDD[T], p: T => Boolean): (RDD[T], RDD[T]) = { 

    val splits = rdd.mapPartitions { iter => 
     val (left, right) = iter.partition(p) 
     val iterSeq = Seq(left, right) 
     iterSeq.iterator 
    } 

    val left = splits.mapPartitions { iter => iter.next().toIterator} 

    val right = splits.mapPartitions { iter => 
     iter.next() 
     iter.next().toIterator 
    } 
    (left, right) 
} 

val rdd = sc.parallelize(0 to 10, 2) 

val (first, second) = split[Int](rdd, _ % 2 == 0) 

first.collect 
// Array[Int] = Array(0, 2, 4, 6, 8, 10) 
+0

Controlla anche questo: https://issues.apache.org/jira/browse/SPARK-3533 –

+1

Scommetto che questo è più complesso e meno efficiente di due filtri –

+0

@JustinPihony sì, i filtri sono molto più efficienti. –

3

Il punto è che non si vuole fare un filtro, ma una mappa.

(T) -> (Boolean, T) 

Siamo spiacenti, non sono efficiente in Scala Sintassi. Ma l'idea è che hai diviso la tua risposta mappandola a coppie chiave/valore. La chiave può essere un valore booleano che indica avvizzire o meno il predicato 'Filtro'.

+0

Mi piace questo dal punto di vista della semplicità –

0

È possibile utilizzare subtract function (Se l'operazione di filtro è troppo costosa).

codice PySpark:

rdd1 = data.filter(filterFunction) 

rdd2 = data.subtract(rdd1) 
Problemi correlati