2014-12-01 8 views
5

C'è qualche funzione Spark che consente di dividere una raccolta in diversi RDD secondo alcuni creteria? Tale funzione consentirebbe di evitare un'eccessiva iterazione. Per esempio:Scala Spark: dividere la raccolta in diversi RDD?

def main(args: Array[String]) { 
    val logFile = "file.txt" 
    val conf = new SparkConf().setAppName("Simple Application") 
    val sc = new SparkContext(conf) 
    val logData = sc.textFile(logFile, 2).cache() 
    val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt") 
    val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt") 
    } 

In questo esempio devo iterare 'logData` due volte solo a dare risultati in due file separati:

val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt") 
    val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt") 

Sarebbe bello invece di avere qualcosa di simile:

val resultMap = logData.map(line => if line.contains("a") ("a", line) else if line.contains("b") ("b", line) else (" - ", line) 
    resultMap.writeByKey("a", "linesA.txt") 
    resultMap.writeByKey("b", "linesB.txt") 

Una cosa del genere?

+0

Sembra che tu voglia una variante groupBy che restituisce una mappa sottostringa-> RDD, piuttosto che un RDD di sottostringa-> Iterable [String]? –

risposta

3

Dai un'occhiata alla seguente domanda.

Write to multiple outputs by key Spark - one Spark job

È possibile flatMap un RDD con una funzione simile alla seguente e poi fare un groupBy sulla chiave.

def multiFilter(words:List[String], line:String) = for { word <- words; if line.contains(word) } yield { (word,line) } 
val filterWords = List("a","b") 
val filteredRDD = logData.flatMap(line => multiFilter(filterWords, line)) 
val groupedRDD = filteredRDD.groupBy(_._1) 

Ma a seconda delle dimensioni del vostro RDD di ingresso si può o non vedere alcun miglioramento delle prestazioni, perché una delle groupBy operazioni comporta un riordino.

D'altra parte se si dispone di memoria sufficiente nel cluster Spark è possibile memorizzare nella cache l'RDD di input e quindi eseguire più operazioni di filtro potrebbe non essere costoso come si pensa.

+0

Grazie. Tuttavia, quando chiamato su un set di dati di coppie (K, V), 'groupBy' restituisce un ** unico insieme di dati RDD ** contante delle coppie (K, Iterable ). Quindi non c'è modo di ottenere una ** raccolta di RDD ** come risultato delle trasformazioni di Spark? – zork

3

Forse qualcosa di simile potrebbe funzionare:

def singlePassMultiFilter[T](
     rdd: RDD[T], 
     f1: T => Boolean, 
     f2: T => Boolean, 
     level: StorageLevel = StorageLevel.MEMORY_ONLY 
): (RDD[T], RDD[T], Boolean => Unit) = { 
    val tempRDD = rdd mapPartitions { iter => 
    val abuf1 = ArrayBuffer.empty[T] 
    val abuf2 = ArrayBuffer.empty[T] 
    for (x <- iter) { 
     if (f1(x)) abuf1 += x 
     if (f2(x)) abuf2 += x 
    } 
    Iterator.single((abuf1, abuf2)) 
    } 
    tempRDD.persist(level) 
    val rdd1 = tempRDD.flatMap(_._1) 
    val rdd2 = tempRDD.flatMap(_._2) 
    (rdd1, rdd2, (blocking: Boolean) => tempRDD.unpersist(blocking)) 
} 

Nota che un'azione invitato rdd1 (resp rdd2.) Causerà tempRDD da calcolare e persistente. Questo è praticamente equivalente al calcolo di rdd2 (risp. rdd1) poiché il sovraccarico di flatMap nelle definizioni di rdd1 e di rdd2 è, credo, molto trascurabile.

Si potrebbe utilizzare singlePassMultiFitler in questo modo:

val (rdd1, rdd2, cleanUp) = singlePassMultiFilter(rdd, f1, f2) 
rdd1.persist() //I'm going to need `rdd1` more later... 
println(rdd1.count) 
println(rdd2.count) 
cleanUp(true)  //I'm done with `rdd2` and `rdd1` has been persisted so free stuff up... 
println(rdd1.distinct.count) 

Chiaramente questo potrebbe esteso a un numero arbitrario di filtri, collezioni di filtri, ecc

+0

Può essere più efficiente a un livello molto basso (utilizzo della CPU della CPU e cose del genere anche se può essere consumato dalla manutenzione di ArrayBuffer) ma al livello più alto svolge esattamente la stessa quantità di lavoro del filtro ripetuto su un 'rdd memorizzato nella cache . Tuttavia sembra molto meglio di una risposta accettata che peggiora la situazione. – zero323

+0

L'approccio che ho suggerito richiede solo iterare attraverso 'rdd' una volta, mentre il filtro ripetuto su' rdd' richiederà iterare più volte 'rdd'. Inoltre, se si sa che i criteri di filtraggio si escludono a vicenda (probabilmente vero per molti usi), allora una maggiore efficienza può essere ottenuta semplicemente sostituendo le due dichiarazioni "if" con una singola istruzione "if-else".Tale ottimizzazione non sarebbe possibile filtrando ripetutamente su 'rdd'. –

+0

In un modo o nell'altro, per gli elementi N e le condizioni M, è ancora O (NM). Se RDD non fosse stato memorizzato nella cache, ci sarebbe stata un'enorme differenza pratica, ma per il resto non ha importanza. Ad ogni modo, se penso ancora che sia un approccio molto migliore rispetto a partizionare quindi l'upvote. – zero323

Problemi correlati