2014-09-08 4 views
6

Cosa fare se, quando si attraversa l'RDD, devo calcolare i valori nel set di dati chiamando il servizio esterno (bloccante)? Come pensi che potrebbe essere raggiunto?Come si esegue il blocco dell'IO in processi spark di apache?

valori val: Future[RDD[Double]] = Future sequence tasks

ho cercato di creare un elenco di Futures, ma come RDD id non Traversable, Future.sequence non è adatto.

Mi chiedo solo se qualcuno avesse avuto un problema simile e come lo hai risolto? Quello che sto cercando di ottenere è ottenere un parallelismo su un singolo nodo di lavoro, quindi posso chiamare quel servizio esterno volte per secondo.

Probabilmente, c'è un'altra soluzione, più adatta per la scintilla, come avere più nodi di lavoro su un singolo host.

È interessante sapere come affronta questa sfida? Grazie.

+0

Che tipo di valore è necessario calcolare? È possibile che possa essere calcolato offline e unito al tuo set di dati? O che il codice remoto potrebbe essere tirato dentro come un barattolo e processato in-process? – DPM

+0

Il valore viene calcolato confrontando l'input fornito e ogni elemento dal RDD. Così ho attraversato RDD e ho confrontato ogni elemento. Comarisson è una chiamata bloccante, perché è nascosta in un componente nativo. Ecco perché mi chiedo, come lo faresti e se avessi questa sfida? Apprezzo molto il tuo aiuto. –

risposta

3

Ecco la risposta alla mia domanda:

val buckets = sc.textFile(logFile, 100) 
val tasks: RDD[Future[Object]] = buckets map { item => 
    future { 
    // call native code 
    } 
} 

val values = tasks.mapPartitions[Object] { f: Iterator[Future[Object]] => 
    val searchFuture: Future[Iterator[Object]] = Future sequence f 
    Await result (searchFuture, JOB_TIMEOUT) 
} 

L'idea qui è, che otteniamo la raccolta delle partizioni, dove ogni partizione viene inviato al lavoratore specifica ed è il pezzo più piccolo di lavoro. Ogni pezzo di lavoro contiene dati che potrebbero essere elaborati chiamando il codice nativo e inviando i dati.

La raccolta 'valori' contiene i dati, che vengono restituiti dal codice nativo e che il lavoro viene eseguito attraverso il cluster.

+0

Stiamo affrontando lo stesso problema in questo momento. Hai un esempio di come viene utilizzato il compito? Grazie, –

+0

La scintilla fornisce automaticamente un implicito ExecutionContext per l'esecuzione dei tuoi futuri? – advait

1

In base alla tua risposta, che la chiamata di blocco è quella di confrontare l'ingresso fornito con ogni singolo elemento nel RDD, vorrei fortemente riscrivere il confronto in java/scala in modo che possa essere eseguito come parte del processo di scintilla. Se il confronto è una funzione "pura" (senza effetti collaterali, dipende solo dai suoi input), dovrebbe essere semplice da implementare nuovamente e la diminuzione della complessità e l'aumento della stabilità del processo di scintilla dovuto al non dover effettuare remote probabilmente le chiamate ne valuteranno la pena.

Sembra improbabile che il servizio remoto sarà in grado di gestire 3000 chiamate al secondo, quindi sarebbe preferibile una versione in-process locale.

Se questo è assolutamente impossibile per qualche motivo, allora si potrebbe essere in grado di creare una trasformazione RDD, che trasforma i dati in un RDD di futures, in pseudo-codice:

val callRemote(data:Data):Future[Double] = ... 

val inputData:RDD[Data] = ... 

val transformed:RDD[Future[Double]] = inputData.map(callRemote) 

E poi continuare dal li, calcolando gli oggetti del tuo futuro [doppio].

Se si conosce il parallelismo che il processo remoto può gestire, è consigliabile abbandonare la modalità Futuro e accettare che si tratti di una risorsa collo di bottiglia.

Il lavoro richiederà probabilmente un po 'di tempo, ma non dovrebbe inondare il servizio remoto e morire in modo orribile.

Un'opzione finale è che se gli input sono ragionevolmente prevedibili e l'intervallo di risultati è coerente e limitato a un numero ragionevole di output (milioni o giù di lì), è possibile precomputerli tutti come un set di dati utilizzando il servizio remoto e trovarli a scintilla tempo di lavoro utilizzando un join.

Problemi correlati