Sto cercando di capire come funziona il cache di Spark.Capire il caching di Spark
Qui è la mia comprensione ingenuo, per favore fatemelo sapere se mi manca qualcosa:
val rdd1 = sc.textFile("some data")
rdd1.cache() //marks rdd1 as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")
In quanto sopra, rdd1 sarà caricato da disco (ad esempio HDFS) solo una volta. (quando rdd2 viene salvato presumo) e poi dalla cache (supponendo che ci sia abbastanza RAM) quando viene salvato rdd3)
Ora ecco la mia domanda. Diciamo che voglio memorizzare nella cache rdd2 e rdd3 poiché saranno entrambi utilizzati in seguito, ma non ho bisogno di rdd1 dopo averli creati.
Fondamentalmente c'è la duplicazione, non è vero? Poiché una volta calcolati rdd2 e rdd3, non ho più bisogno di rdd1, probabilmente dovrei eliminarlo, giusto? la domanda è quando?
Funzionerà? (Opzione A)
val rdd1 = sc.textFile("some data")
rdd1.cache() // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd1.unpersist()
Vuol scintilla aggiungere la chiamata unpersist al DAG? o è fatto immediatamente? se è fatto immediatamente, quindi fondamentalmente rdd1 non verrà memorizzato nella cache quando leggo da rdd2 e rdd3, giusto?
Devo farlo in questo modo invece (Opzione B)?
val rdd1 = sc.textFile("some data")
rdd1.cache() // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")
rdd1.unpersist()
Quindi la domanda è questa: è l'opzione A abbastanza buono? Ad esempio, rdd1
caricherà ancora il file solo una volta? O devo andare con l'opzione B?
Sì, sembra siete su esso. Questo è un po 'sfortunato, vorrei che la "cache" fosse stata convertita in un'operazione DAG e non solo aggiungere l'ID RDD a una mappa ... ci sono molti casi in cui si desidera memorizzare qualcosa in modo intermittente, creare un nuovo RDD, quindi rilascia quello vecchio. Forse ci sono buone ragioni teoriche sul perché questa non è una buona idea però ... in ogni caso, l'ordinamento della cache LRU (presumo) significa che il vecchio rdd1 inutilizzato sarà sfrattato se rdd2 e rdd3 hanno bisogno di quello spazio per il caching ... –
Quindi, per la maggior parte, ho semplicemente esaminato cosa stanno facendo persist/cache e unpersist, ma c'è ancora spazio per considerare cosa sta facendo Spark quando si ricava un RDD da un altro e come potrebbe comunque ottimizzare. Non sono sicuro che 'rdd1' abbia bisogno di essere memorizzato nella cache, potrebbe essere controllato da' rdd2' e 'rdd3' quando sono memorizzati nella cache o quando il DAG è pipeline. Questo è più di un'area grigia per me però. – Rich
Ha effettuato un po 'più di ricerca e tracciamento attraverso il debugger. 'rdd2' e' rdd3' farà riferimento a 'rdd1' come dipendenza. 'rdd1' caricherà i suoi dati in partizioni una volta sulla prima azione eseguita. Ora a questo punto 'rdd2' e' rdd3' applicano entrambe le loro trasformazioni ai dati già caricati da 'rdd1' nelle partizioni. Credo che il caching fornisca valore se si eseguono più azioni sullo stesso RDD esatto, ma in questo caso di nuovi RDD ramificati non penso che si verifichi lo stesso problema perché credo che Spark sia consapevole del fatto che 'rdd1' è ancora un dipendenza per 'rdd3' dopo il primo salvataggio. – Rich