2014-11-12 19 views
7

Sto implementando un algoritmo che può essere facilmente parallelizzato ma non è possibile capire come creare un numero adeguato di futuri e come abortire in anticipo. Al momento il contorno del codice è lungo queste lineeUso corretto dei futures nei calcoli paralleli

def solve: Boolean = { 
    var result = false 
    while(!result && i < iterations) { 
    val futures = (1 to threads) map { _ => solveIter(geInitialValues()) } 
    val loopResult = Future.fold(futures)(false)((acc, r) => acc || r) 
    result = Await.result(loopResult, Duration.Inf) 
    i+=1 
    } 
} 

def solveIter(initialValues: Values): Future[Boolean] = Future { 
    /* Takes a lot of time */ 
} 

Il problema evidente è impostato esplicitamente livello di parallelismo che può o non può essere adatto per contesto di esecuzione corrente. Se tutti i futuri vengono creati in una sola volta, come fare in modo che Future.fold termini prima?

+0

cosa intendevi con 'come creare un numero adeguato di futuri' e quando 'abortire presto'? - interrompere se il primo futuro restituisce true? –

+0

Sì, non mi piace il fatto che io stia arbitrariamente dividendo l'intero compito che ha unità di lavoro naturali e che 'fold' aspetta che tutte le attività siano completate. – synapse

+0

Quindi, vuoi iniziare n calcoli di solveIter e non appena il primo ritorna vero, vuoi abortire gli altri e iniziare la prossima iterazione? Dovresti cercare callback o attori per farlo ... –

risposta

2

Non è possibile annullare un futuro perché i future sono di sola lettura. Ma puoi usare una promessa che è "la parte scritta da un futuro".

codice Esempio:

  • questo codice timeout dopo 5 secondi perché il futuro non sono fatto (solveIter mai completata)
  • per completare una promessa, rimuovere commento che aggiunge una promessa completato alle 'promesse' - gli altri futuri saranno annullare
  • remove 'promises.foreach (_ trySuccess (false).)' e si ottiene ancora una volta il timeout, perché gli altri futuri non ricevendo annullare

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.duration._ 
import scala.concurrent.{Await, Future, Promise} 
import scala.util.Try 


// create a bunch of promises 
val promises = ((1 to 10) map { _ => 
    val p = Promise[Boolean]() 
    p.completeWith(solveIter()) 
    p 
}) // :+ Promise().success(true) 
// ^^ REMOVE THIS COMMENT TO ADD A PROMISE WHICH COMPLETES 

// get the futures from the promises 
val futures = promises.map(_.future) 

// loop over all futures 
futures.foreach(oneFuture => 
    // register callback when future is done 
    oneFuture.foreach{ 
    case true => 
     println("future with 'true' result found") 

     // stop others 
     promises.foreach(_.trySuccess(false)) 

    case _ => // future completes with false 
    }) 



// wait at most 5 seconds till all futures are done 
Try(Await.ready(Future.sequence(futures), 5.seconds)).recover { case _ => 
    println("TIMEOUT") 
} 


def solveIter(): Future[Boolean] = Future { 
    /* Takes a VERY VERY VERY .... lot of time */ 
    Try(Await.ready(Promise().future, Duration.Inf)) 
    false 
}