2015-11-19 9 views
5

Vorrei un modo per restituire righe dal mio RDD uno alla volta (o in piccoli lotti) in modo da poter raccogliere le righe localmente come ho bisogno di loro. Il mio RDD è abbastanza grande da non poter essere inserito nella memoria sul nodo del nome, pertanto l'esecuzione di collect() causerebbe un errore.raccogliere RDD con buffer in pyspark

C'è un modo per ricreare l'operazione collect() ma con un generatore, in modo che le righe dall'RDD siano passate in un buffer? Un'altra opzione sarebbe quella di take() 100000 righe alla volta da un RDD memorizzato nella cache, ma non credo che take() ti consenta di specificare una posizione iniziale?

+1

C'è qualcosa che ti fa venir voglia di evitare "saveAsTextFile"? Perché puoi scaricare tutto su un file e poi leggerlo attraverso un buffer. –

+0

@ paul-k Attualmente utilizzo saveAsTextFile, tuttavia questo ha un paio di problemi: 1) il tempo di lettura è lento, perché questi sono file di testo e 2) Perdo informazioni sui tipi di dati, quindi '1' è lo stesso di 1 – mgoldwasser

+0

Questo è vero 2) è ancora un problema, ma puoi ancora scrivere informazioni sul tipo anche se questo non è molto economico in termini di dimensioni del file. puoi anche chiamare SaveAsPickleFile per serializzare oggetti. 1) Non penso che sarebbe più lento dell'attuale implementazione di 'collect' poiché legge da un file temporaneo secondo i doc: ps: //spark.apache.org/docs/0.7.2/api/pyspark /pyspark.rdd-pysrc.html#RDD.collect –

risposta

5

La migliore opzione disponibile è utilizzare RDD.toLocalIterator che raccoglie solo una singola partizione alla volta. Si crea un generatore standard di Python:

rdd = sc.parallelize(range(100000)) 
iterator = rdd.toLocalIterator() 
type(iterator) 

## generator 

even = (x for x in iterator if not x % 2) 

È possibile regolare quantità di dati raccolti in un unico batch usando un partizionamento specifico e di regolazione di un numero di partizioni.

Purtroppo viene fornito con un prezzo. Per raccogliere piccoli lotti devi avviare più lavori Spark ed è piuttosto costoso. Quindi, in generale, raccogliere un elemento al momento non è un'opzione.

+0

Volevo solo aggiungere una piccola nota, funziona perfettamente con 'glom()' se vuoi un iteratore che restituisce una lista per partizione. – numeral