2016-03-09 12 views
5

Costruisco un RDD da un elenco di URL e quindi provo a recuperare i dati con alcune chiamate http asincrone. Ho bisogno di tutti i risultati prima di fare altri calcoli. Idealmente, ho bisogno di fare le chiamate http su nodi differenti per considerazioni di ridimensionamento.Lavoro spark con chiamata HTTP Async

ho fatto qualcosa di simile:

//init spark 
val sparkContext = new SparkContext(conf) 
val datas = Seq[String]("url1", "url2") 

//create rdd 
val rdd = sparkContext.parallelize[String](datas) 

//httpCall return Future[String] 
val requests = rdd.map((url: String) => httpCall(url)) 

//await all results (Future.sequence may be better) 
val responses = requests.map(r => Await.result(r, 10.seconds)) 

//print responses 
response.collect().foreach((s: String) => println(s)) 

//stop spark 
sparkContext.stop() 

Questo lavoro, ma Job della scintilla mai fine!

Quindi mi chiedo quali sono le migliori pratiche per affrontare Future usando Spark (o Future [RDD]).

Penso che questo caso d'uso sia piuttosto comune, ma non ho ancora trovato alcuna risposta.

migliori saluti

risposta

4

questo caso l'uso sembra piuttosto comune

Non proprio, perché semplicemente non funziona come che (probabilmente) aspetta. Poiché ogni attività opera su Scala Iterators standard, queste operazioni verranno compresse insieme. Significa che tutte le operazioni si bloccheranno nella pratica. Supponendo di avere tre URL [ "x", "z" "y"] si codice verrà eseguito in un seguente ordine:

Await.result(httpCall("x", 10.seconds)) 
Await.result(httpCall("y", 10.seconds)) 
Await.result(httpCall("z", 10.seconds)) 

Si può facilmente riprodurre lo stesso comportamento a livello locale. Se si desidera eseguire il codice in modo asincrono si dovrebbe gestire questo in modo esplicito utilizzando mapPartitions:

rdd.mapPartitions(iter => { 
    ??? // Submit requests 
    ??? // Wait until all requests completed and return Iterator of results 
}) 

ma questo è relativamente difficile. Non vi è alcuna garanzia che tutti i dati per una data partizione si adattino alla memoria, quindi probabilmente avrete anche bisogno di un meccanismo di batching.

Tutto ciò detto non è possibile riprodurre il problema che hai descritto è può essere un problema di configurazione o un problema con httpCall stesso.

Su una nota a margine che consente un singolo timeout per uccidere l'intero compito non sembra una buona idea.

1

Questo non funzionerà.

Non è possibile aspettarsi che gli oggetti di richiesta vengano distribuiti e le risposte raccolte su un cluster da altri nodi. Se lo fai, la scintilla per il futuro non finirà mai. Il futuro non funzionerà mai in questo caso.

Se la tua mappa() effettua richieste di sincronizzazione (http), ti preghiamo di raccogliere le risposte all'interno della stessa chiamata di azione/trasformazione e quindi sottoporre i risultati (risposte) a ulteriori mappe/riduzioni/altre chiamate.

Nel tuo caso, si prega di riscrivere la logica raccogliere le risposte per ogni chiamata sincronizzata e rimuovere la nozione di futuro, quindi tutto dovrebbe andare bene.

+0

Il problema non dovrebbe essere lo spostamento di dati tra 'richieste' e' risposte ', quindi entrambe le trasformazioni devono essere eseguite nello stesso stadio, quindi gli stessi esecutori e contesti. – zero323

1

Finalmente l'ho fatto usando scalaj-http invece di Dispatch. La chiamata è sincrona, ma corrisponde al mio caso d'uso.

Penso che lo Spark Job non abbia mai finito di usare Dispatch perché la connessione Http non è stata chiusa correttamente.

migliori saluti

1

Non riuscivo a trovare un modo semplice per raggiungere questo obiettivo. Ma dopo diverse ripetizioni di tentativi questo è ciò che ho fatto e il suo funzionamento per un enorme elenco di domande. Fondamentalmente lo abbiamo usato per eseguire un'operazione batch per una query enorme in più sottocapitoli.

// Break down your huge workload into smaller chunks, in this case huge query string is broken 
// down to a small set of subqueries 
// Here if needed to optimize further down, you can provide an optimal partition when parallelizing 
val queries = sqlContext.sparkContext.parallelize[String](subQueryList.toSeq) 

// Then map each one those to a Spark Task, in this case its a Future that returns a string 
val tasks: RDD[Future[String]] = queries.map(query => { 
    val task = makeHttpCall(query) // Method returns http call response as a Future[String] 
    task.recover { 
     case ex => logger.error("recover: " + ex.printStackTrace()) } 
    task onFailure { 
     case t => logger.error("execution failed: " + t.getMessage) } 
    task 
}) 

// Note:: Http call is still not invoked, you are including this as part of the lineage 

// Then in each partition you combine all Futures (means there could be several tasks in each partition) and sequence it 
// And Await for the result, in this way you making it to block untill all the future in that sequence is resolved 

val contentRdd = tasks.mapPartitions[String] { f: Iterator[Future[String]] => 
    val searchFuture: Future[Iterator[String]] = Future sequence f 
    Await.result(searchFuture, threadWaitTime.seconds) 
} 

// Note: At this point, you can do any transformations on this rdd and it will be appended to the lineage. 
// When you perform any action on that Rdd, then at that point, 
// those mapPartition process will be evaluated to find the tasks and the subqueries to perform a full parallel http requests and 
// collect those data in a single rdd. 

se non volete eseguire alcuna trasformazione sul contenuto, come l'analisi del payload di risposta, ecc Poi si potrebbe usare foreachPartition invece di mapPartitions di compiere tutti gli http chiamate immediatamente.

Problemi correlati