2016-02-18 9 views
6

Vorrei creare un RDD per raccogliere i risultati di un calcolo iterativo.Creazione di un RDD per raccogliere i risultati di un calcolo iterativo

Come posso utilizzare un ciclo (o alternativo) per sostituire il codice seguente:

import org.apache.spark.mllib.random.RandomRDDs._  

val n = 10 

val step1 = normalRDD(sc, n, seed = 1) 
val step2 = normalRDD(sc, n, seed = (step1.max).toLong) 
val result1 = step1.zip(step2) 
val step3 = normalRDD(sc, n, seed = (step2.max).toLong) 
val result2 = result1.zip(step3) 

... 

val step50 = normalRDD(sc, n, seed = (step49.max).toLong) 
val result49 = result48.zip(step50) 

(creando le RDDs step N e comprimendo poi insieme alla fine sarebbe anche bene fintanto 50 RDDs vengono creati in modo iterativo per rispettare il seme = (passo (n-1 .max)) condizione)

+0

userei 'Stream.unfold' da scalaz per generare un flusso di passaggi e quindi comprimerlo con se stesso e/o scanRight .. – Reactormonk

risposta

6

una funzione ricorsiva avrebbe funzionato:

/** 
* The return type is an Option to handle the case of a user specifying 
* a non positive number of steps. 
*/ 
def createZippedNormal(sc : SparkContext, 
         numPartitions : Int, 
         numSteps : Int) : Option[RDD[Double]] = { 

    @scala.annotation.tailrec 
    def accum(sc : SparkContext, 
      numPartitions : Int, 
      numSteps : Int, 
      currRDD : RDD[Double], 
      seed : Long) : RDD[Double] = { 
    if(numSteps <= 0) currRDD 
    else { 
     val newRDD = normalRDD(sc, numPartitions, seed) 
     accum(sc, numPartitions, numSteps - 1, currRDD.zip(newRDD), newRDD.max) 
    } 
    } 

    if(numSteps <= 0) None 
    else Some(accum(sc, numPartitions, numSteps, sc.emptyRDD[Double], 1L)) 
} 
+0

La ricorsione della coda non ti proteggerà dal lignaggio RDD che soffia lo stack :) – zero323

+0

@ zero323 Concordato. Tuttavia, questo problema è inerente ai requisiti della domanda. Qualsiasi risposta avrebbe un problema simile. –

+0

Volevo solo sottolineare che stai costruendo una struttura dati ricorsiva dietro le quinte che non sarà ottimizzata in coda. Niente di più :) E in realtà è possibile risolverlo ed evitare il problema utilizzando i checkpoint. È persino risolvibile senza una singola zip :) – zero323

Problemi correlati