2012-03-22 24 views
6

Ho scritto un'applicazione Scala (2.9.1-1) che deve elaborare diversi milioni di righe da una query di database. Io sono la conversione del ResultSet ad un Stream con la tecnica mostrata nella risposta ad una mia previous questions:Consumo di memoria di un flusso Scala parallelo

class Record(...) 

val resultSet = statement.executeQuery(...) 

new Iterator[Record] { 
    def hasNext = resultSet.next() 
    def next = new Record(resultSet.getString(1), resultSet.getInt(2), ...) 
}.toStream.foreach { record => ... } 

e questo ha funzionato molto bene.

Poiché il corpo della chiusura foreach è molto CPU, e come testamento alla praticità di programmazione funzionale, se si aggiunge una .par prima della foreach, le chiusure ottenere condotte in parallelo con nessun altro sforzo, se non per fare sicuro che il corpo della chiusura sia thread-safe (è scritto in uno stile funzionale senza dati mutabili tranne che per la stampa su un log thread-safe).

Tuttavia, sono preoccupato per il consumo di memoria. È il .par che causa l'intero set di risultati da caricare nella RAM, oppure l'operazione parallela carica solo il maggior numero di righe dei thread attivi? Ho assegnato 4G alla JVM (64-bit con -Xmx4g) ma in futuro lo farò su più file e temo che alla fine avrò una memoria esaurita.

Esiste uno schema migliore per eseguire questo tipo di elaborazione parallela in modo funzionale? Ho mostrato questa applicazione ai miei colleghi come esempio del valore della programmazione funzionale e delle macchine multi-core.

+0

solo curiosi. Che DBMS stai usando e quale API Scala DB interrogarla? – santiagobasulto

+0

Accedo a un database Microsoft SQL Server 2012 in esecuzione su Windows Server 2008 R2 utilizzando il driver JDBC di Microsoft (http://msdn.microsoft.com/en-us/sqlserver/aa937724). – Ralph

risposta

4

Se si guarda al scaladoc of Stream, si noterà che la classe definizione di par è il trait Parallelizable ... e, se si guarda al source code of this trait, si noterà che prende ogni elemento della collezione originale e mettere li in un combinatore, quindi, si caricano ciascuna riga in una ParSeq:

def par: ParRepr = { 
    val cb = parCombiner 
    for (x <- seq) cb += x 
    cb.result 
    } 

    /** The default `par` implementation uses the combiner provided by this method 
    * to create a new parallel collection. 
    * 
    * @return a combiner for the parallel collection of type `ParRepr` 
    */ 
    protected[this] def parCombiner: Combiner[A, ParRepr] 

una possibile soluzione è quella di parallelizzare esplicitamente l'calcolo, grazie ad attori per esempio. Puoi dare un'occhiata a this example dalla documentazione di akka, ad esempio, che potrebbe essere utile nel tuo contesto.

+0

Avevo paura di ciò. Ho pensato di sparare una serie di thread e quindi di avere ciascuna riga di pull dal set di risultati (sincronizzato), ma non sembra una soluzione molto funzionale. – Ralph

+0

Chiedere a un attore di completare la query e generare un Router con un Resizer che si impegna a inserire blocchi. –

-1

La nuova biblioteca akka stream è la correzione che stai cercando:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Source, Sink} 

def iterFromQuery() : Iterator[Record] = { 
    val resultSet = statement.executeQuery(...) 
    new Iterator[Record] { 
    def hasNext = resultSet.next() 
    def next = new Record(...) 
    } 
} 

def cpuIntensiveFunction(record : Record) = { 
... 
} 

implicit val actorSystem = ActorSystem() 
implicit val materializer = ActorMaterializer() 
implicit val execContext = actorSystem.dispatcher 

val poolSize = 10 //number of Records in memory at once 

val stream = 
    Source(iterFromQuery).runWith(Sink.foreachParallel(poolSize)(cpuIntensiveFunction)) 

stream onComplete {_ => actorSystem.shutdown()} 
Problemi correlati