Quando eseguo il codice come il seguente:Spark RDD checkpoint persistito RDDs/cache sta eseguendo il DAG due volte
val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
newRDD.checkpoint
print(newRDD.count())
e guardare le varie fasi di Filati, noto che Spark sta facendo il calcolo DAG TWICE - - una volta per il distinto + count che materializza il RDD e lo memorizza nella cache, e quindi un SECONDO tempo per creare la copia del checkpoint.
Poiché il RDD è già presente e memorizzato nella cache, perché il checkpoint non sfrutta semplicemente questo e salva le partizioni memorizzate nella cache sul disco?
Esiste un modo esistente (una sorta di impostazione di configurazione o di modifica del codice) per forzare Spark a sfruttare questo e solo eseguire l'operazione UNA VOLTA e il checkpoint copierà solo le cose?
Devo "materializzare" due volte, invece?
val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(newRDD.count())
newRDD.checkpoint
print(newRDD.count())
Ho creato un biglietto Jira Apache Spark di rendere questa una richiesta di funzionalità: https://issues.apache.org/jira/browse/SPARK-8666