2015-06-26 8 views
5

Quando eseguo il codice come il seguente:Spark RDD checkpoint persistito RDDs/cache sta eseguendo il DAG due volte

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER) 
newRDD.checkpoint 
print(newRDD.count()) 

e guardare le varie fasi di Filati, noto che Spark sta facendo il calcolo DAG TWICE - - una volta per il distinto + count che materializza il RDD e lo memorizza nella cache, e quindi un SECONDO tempo per creare la copia del checkpoint.

Poiché il RDD è già presente e memorizzato nella cache, perché il checkpoint non sfrutta semplicemente questo e salva le partizioni memorizzate nella cache sul disco?

Esiste un modo esistente (una sorta di impostazione di configurazione o di modifica del codice) per forzare Spark a sfruttare questo e solo eseguire l'operazione UNA VOLTA e il checkpoint copierà solo le cose?

Devo "materializzare" due volte, invece?

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER) 
print(newRDD.count()) 

newRDD.checkpoint 
print(newRDD.count()) 

Ho creato un biglietto Jira Apache Spark di rendere questa una richiesta di funzionalità: https://issues.apache.org/jira/browse/SPARK-8666

risposta

Problemi correlati