2015-09-10 15 views
6

Ho un RDD[String], wordRDD. Ho anche una funzione che crea un RDD [String] da una stringa/parola. Vorrei creare un nuovo RDD per ogni stringa in wordRDD. Qui sono i miei tentativi:Come creare la raccolta di RDD da RDD?

1) non è riuscita perché Spark non supporta RDDs nidificate:?

var newRDD = wordRDD.map(word => { 
    // execute myFunction() 
    (new MyClass(word)).myFunction() 
}) 

2) non è riuscita (forse a causa di problema di portata):

var newRDD = sc.parallelize(new Array[String](0)) 
val wordArray = wordRDD.collect 
for (w <- wordArray){ 
    newRDD = sc.union(newRDD,(new MyClass(w)).myFunction()) 
} 

Il mio risultato ideale sarebbe simile:

// input RDD (wordRDD) 
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...) 

// myFunction behavior 
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl') 

// after executing myFunction() on each word in wordRDD: 
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...) 

ho trovato qui una domanda rilevante: Spark when union a lot of RDD throws stack overflow error, ma non ha affrontato la mia problema.

risposta

3

Usa flatMap per ottenere RDD[String] come volete.

var allWords = wordRDD.flatMap { word => 
    (new MyClass(word)).myFunction().collect() 
} 
+1

Come si suppone debba funzionare in parallelo? Tutto ciò che accade all'interno di 'wordRDD.map' viene eseguito sul cluster. Pertanto, il 'collect' interno deve attivare un nuovo lavoro Spark da un lavoro in esecuzione. Sospetto che non funzionerà in modo distribuito. –

+0

Poteva anche modificare la funzione per restituire gli array anziché gli RDD, ma la domanda non specificava la funzione effettiva. –

+0

Ma la sua descrizione dice che ha una funzione, presumo che sia 'myFunction' che crea un' RDD [String] 'da una stringa/parola. –

3

Non è possibile creare un RDD da un altro RDD.

Tuttavia, è possibile riscrivere la funzione myFunction: String => RDD[String], che genera tutte le parole dall'ingresso in cui è stata rimossa una lettera, in un'altra funzione modifiedFunction: String => Seq[String] in modo che possa essere utilizzata all'interno di un RDD. In questo modo, verrà eseguito anche in parallelo sul tuo cluster. Con lo modifiedFunction è possibile ottenere l'RDD finale con tutte le parole semplicemente chiamando wordRDD.flatMap(modifiedFunction).

Il punto cruciale è quello di utilizzare flatMap (a map e flatten le trasformazioni):

def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]") 
    val sc = new SparkContext(sparkConf) 

    val input = sc.parallelize(Seq("apple", "ananas", "banana")) 

    // RDD("pple", "aple", ..., "nanas", ..., "anana", "bnana", ...) 
    val result = input.flatMap(modifiedFunction) 
} 

def modifiedFunction(word: String): Seq[String] = { 
    word.indices map { 
    index => word.substring(0, index) + word.substring(index+1) 
    } 
}