2015-04-27 13 views
30

Sto cercando di capire come funziona il cache di Spark.Capire il caching di Spark

Qui è la mia comprensione ingenuo, per favore fatemelo sapere se mi manca qualcosa:

val rdd1 = sc.textFile("some data") 
rdd1.cache() //marks rdd1 as cached 
val rdd2 = rdd1.filter(...) 
val rdd3 = rdd1.map(...) 
rdd2.saveAsTextFile("...") 
rdd3.saveAsTextFile("...") 

In quanto sopra, rdd1 sarà caricato da disco (ad esempio HDFS) solo una volta. (quando rdd2 viene salvato presumo) e poi dalla cache (supponendo che ci sia abbastanza RAM) quando viene salvato rdd3)

Ora ecco la mia domanda. Diciamo che voglio memorizzare nella cache rdd2 e rdd3 poiché saranno entrambi utilizzati in seguito, ma non ho bisogno di rdd1 dopo averli creati.

Fondamentalmente c'è la duplicazione, non è vero? Poiché una volta calcolati rdd2 e rdd3, non ho più bisogno di rdd1, probabilmente dovrei eliminarlo, giusto? la domanda è quando?

Funzionerà? (Opzione A)

val rdd1 = sc.textFile("some data") 
rdd1.cache() // marks rdd as cached 
val rdd2 = rdd1.filter(...) 
val rdd3 = rdd1.map(...) 
rdd2.cache() 
rdd3.cache() 
rdd1.unpersist() 

Vuol scintilla aggiungere la chiamata unpersist al DAG? o è fatto immediatamente? se è fatto immediatamente, quindi fondamentalmente rdd1 non verrà memorizzato nella cache quando leggo da rdd2 e rdd3, giusto?

Devo farlo in questo modo invece (Opzione B)?

val rdd1 = sc.textFile("some data") 
rdd1.cache() // marks rdd as cached 
val rdd2 = rdd1.filter(...) 
val rdd3 = rdd1.map(...) 

rdd2.cache() 
rdd3.cache() 

rdd2.saveAsTextFile("...") 
rdd3.saveAsTextFile("...") 

rdd1.unpersist() 

Quindi la domanda è questa: è l'opzione A abbastanza buono? Ad esempio, rdd1 caricherà ancora il file solo una volta? O devo andare con l'opzione B?

risposta

19

Sembrerebbe che sia richiesta l'opzione B. Il motivo è legato al modo in cui persistono/cache e unpersist vengono eseguiti da Spark. Poiché le trasformazioni RDD si limitano a creare descrizioni DAG senza esecuzione, nell'opzione A quando si chiama unpersist, si hanno solo descrizioni del lavoro e non un'esecuzione in esecuzione.

Ciò è rilevante in quanto una chiamata cache o persist aggiunge semplicemente l'RDD a una mappa di RDD contrassegnati come permanenti durante l'esecuzione del lavoro. Tuttavia, unpersist dice direttamente a blockManager di rimuovere l'RDD dalla memoria e rimuove il riferimento nella mappa degli RDD persistenti.

persist function

unpersist function

modo che avrebbe bisogno di chiamare unpersist dopo Spark effettivamente eseguito e memorizzato il RDD con il gestore del blocco.

I commenti per il RDD.persist metodo di suggerimento in questa direzione: rdd.persist

+1

Sì, sembra siete su esso. Questo è un po 'sfortunato, vorrei che la "cache" fosse stata convertita in un'operazione DAG e non solo aggiungere l'ID RDD a una mappa ... ci sono molti casi in cui si desidera memorizzare qualcosa in modo intermittente, creare un nuovo RDD, quindi rilascia quello vecchio. Forse ci sono buone ragioni teoriche sul perché questa non è una buona idea però ... in ogni caso, l'ordinamento della cache LRU (presumo) significa che il vecchio rdd1 inutilizzato sarà sfrattato se rdd2 e rdd3 hanno bisogno di quello spazio per il caching ... –

+0

Quindi, per la maggior parte, ho semplicemente esaminato cosa stanno facendo persist/cache e unpersist, ma c'è ancora spazio per considerare cosa sta facendo Spark quando si ricava un RDD da un altro e come potrebbe comunque ottimizzare. Non sono sicuro che 'rdd1' abbia bisogno di essere memorizzato nella cache, potrebbe essere controllato da' rdd2' e 'rdd3' quando sono memorizzati nella cache o quando il DAG è pipeline. Questo è più di un'area grigia per me però. – Rich

+2

Ha effettuato un po 'più di ricerca e tracciamento attraverso il debugger. 'rdd2' e' rdd3' farà riferimento a 'rdd1' come dipendenza. 'rdd1' caricherà i suoi dati in partizioni una volta sulla prima azione eseguita. Ora a questo punto 'rdd2' e' rdd3' applicano entrambe le loro trasformazioni ai dati già caricati da 'rdd1' nelle partizioni. Credo che il caching fornisca valore se si eseguono più azioni sullo stesso RDD esatto, ma in questo caso di nuovi RDD ramificati non penso che si verifichi lo stesso problema perché credo che Spark sia consapevole del fatto che 'rdd1' è ancora un dipendenza per 'rdd3' dopo il primo salvataggio. – Rich

2

In opzione A, non hanno dimostrato quando si chiama l'azione (chiamata per salvare)

val rdd1 = sc.textFile("some data") 
rdd.cache() //marks rdd as cached 
val rdd2 = rdd1.filter(...) 
val rdd3 = rdd1.map(...) 
rdd2.cache() 
rdd3.cache() 
rdd1.unpersist() 
rdd2.saveAsTextFile("...") 
rdd3.saveAsTextFile("...") 

Se la sequenza è come sopra, l'opzione A dovrebbe usare la versione cache di rdd1 per calcolare sia rdd2 che rdd 3

+0

Dovrei, sono d'accordo, ma lo farebbe? Penso che non lo sarebbe, come quando si chiama rdd2.saveAsTestFile ecc, rdd1 è già contrassegnato come non persistente. il persist/unpersist non è sul DAG –

+0

fino a quando non si chiama saveAsFile, niente ** veramente ** accade ..... quindi il mio punto è l'ordine della chiamata rdd1.unpersist non importa se rdd2 è già memorizzato nella cache –

0

L'opzione B è un approccio ottimale con piccoli ritocchi. Utilizzare metodi di azione meno costosi. Nell'approccio menzionato dal tuo codice, saveAsTextFile è un'operazione costosa, sostituiscila con il metodo count.

idea è quella di rimuovere il grande rdd1 da DAG, se non è rilevante per l'ulteriore calcolo (dopo rdd2 e rdd3 sono creati)

approccio Aggiornato dal codice

val rdd1 = sc.textFile("some data").cache() 
val rdd2 = rdd1.filter(...).cache() 
val rdd3 = rdd1.map(...).cache() 

rdd2.count 
rdd3.count 

rdd1.unpersist()