2013-06-30 14 views
6

Esiste un modo semplice per utilizzare le collezioni parallele scala senza caricare una raccolta completa in memoria?Elaborazione parallela di raccolta di dati di dimensioni superiori alla memoria

Ad esempio, ho una grande collezione e mi piacerebbe eseguire una particolare operazione (fold) in parallelo solo su un piccolo blocco, che si adatta alla memoria, che su un altro blocco e così via, e infine ricombinare i risultati di tutti i pezzi

So che gli attori potrebbero essere utilizzati, ma sarebbe davvero bello usare le raccolte par.

ho scritto una soluzione, ma non è bello:

def split[A](list: Iterable[A], chunkSize: Int): Iterable[Iterable[A]] = { 
    new Iterator[Iterable[A]] { 
     var rest = list 
     def hasNext = !rest.isEmpty 
     def next = { 
     val chunk = rest.take(chunkSize) 
     rest = rest.drop(chunkSize) 
     chunk 
     } 
    }.toIterable 
    }            

    def foldPar[A](acc: A)(list: Iterable[A], chunkSize: Int, combine: ((A, A) => A)): A = { 
    val chunks: Iterable[Iterable[A]] = split(list, chunkSize) 
    def combineChunk: ((A,Iterable[A]) => A) = { case (res, entries) => entries.par.fold(res)(combine) } 
    chunks.foldLeft(acc)(combineChunk) 
    }            

    val chunkSize = 10000000       
    val x = 1 to chunkSize*10     

    def sum: ((Int,Int) => Int) = {case (acc,n) => acc + n } 

    foldPar(0)(x,chunkSize,sum) 
+1

direi che corretto modello di calcolo qui sarà * * map ridurre (e quindi potrebbe essere [Spark] (http://spark-project.org/examples/)), non gli attori di per sé. –

+0

Formalmente - sì, ma il tempo di elaborazione non è sensato in questo caso quindi è del tutto normale eseguire su una singola macchina. –

risposta

4

La tua idea è molto pulito ed è un peccato non esiste una funzione disponibile già (per quanto ne so).

Ho appena riformulato la tua idea in un codice un po 'più breve. Innanzitutto, ritengo che per il folding parallelo sia utile utilizzare il concetto di monoid: è una struttura con un'operazione associativa e un elemento zero. L'associatività è importante perché non conosciamo l'ordine in cui combiniamo i risultati calcolati in parallelo. E l'elemento zero è importante in modo che possiamo dividere i calcoli in blocchi e iniziare a piegarli ognuno dallo zero. Non c'è nulla di nuovo, tuttavia, è proprio quello che si aspetta fold per le collezioni di Scala.

// The function defined by Monoid's apply must be associative 
// and zero its identity element. 
trait Monoid[A] 
    extends Function2[A,A,A] 
{ 
    val zero: A 
} 

successivo, di Scala Iterator s hanno già un metodo utile grouped(Int): GroupedIterator[Seq[A]] che taglia all'iteratore in sequenze di dimensione fissa. È abbastanza simile al tuo split. Ciò ci permette di tagliare l'ingresso in blocchi di dimensione fissa e quindi applicare metodi di raccolta parallele di Scala sopra:

def parFold[A](c: Iterator[A], blockSize: Int)(implicit monoid: Monoid[A]): A = 
    c.grouped(blockSize).map(_.par.fold(monoid.zero)(monoid)) 
         .fold(monoid.zero)(monoid); 

Pieghiamo ciascun blocco utilizzando il framework collezioni parallele e poi (senza parallelizzazione) combinare i risultati intermedi.

Un esempio:

// Example: 
object SumMonoid extends Monoid[Long] { 
    override val zero: Long = 0; 
    override def apply(x: Long, y: Long) = x + y; 
} 
val it = Iterator.range(1, 10000001).map(_.toLong) 
println(parFold(it, 100000)(SumMonoid)); 
+0

Bel uso di monoidi, non si sa mai prima. Per quanto riguarda il metodo raggruppato, dubito che possa caricare tutta la roba in memoria, ma si spegne perché non lo fa. –

+0

Testerò la tua soluzione un po 'più tardi, ma sembra che dovrebbe funzionare ed è molto più conciso. Grazie molto! –

+0

@MikhailGolubtsov Per favore fatemi sapere come va il vostro test, sono anche curioso. Ho fatto solo alcuni test di base. –

Problemi correlati