2016-02-01 10 views
25

Qual è la differenza tra il punto di controllo spark e il persistere su un disco. Sono entrambi questi store nel disco locale?Qual è la differenza tra il punto di controllo spark e il persistere su un disco

+0

È una domanda molto generica. Meglio sarebbe aggiungere un contesto intorno ad esso. Per rispondere alla tua domanda, può essere archiviato in qualsiasi Area di archiviazione persistente: DIsk locale o spazio su HDFS o NFS ecc. – Sumit

+3

@ Somma: questa è una domanda molto specifica sulle differenze tra due metodi Spark RDD. La risposta può essere obiettiva e mirata, come dimostra la risposta di zero323 qui sotto. –

risposta

29

Ci sono poche differenze importanti ma quella fondamentale è ciò che accade con il lignaggio. Persist/cache mantiene intatto il lignaggio mentre checkpoint interrompe il lignaggio. Consente di prendere in considerazione esempi seguenti:

import org.apache.spark.storage.StorageLevel 

val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _) 
  • cache/persist:

    val indCache = rdd.mapValues(_ > 4) 
    indCache.persist(StorageLevel.DISK_ONLY) 
    
    indCache.toDebugString 
    // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated] 
    // | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated] 
    // +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated] 
    //  | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated] 
    
    indCache.count 
    // 3 
    
    indCache.toDebugString 
    // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated] 
    // |  CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B 
    // | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated] 
    // +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated] 
    //  | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated] 
    
  • checkpoint:

    val indChk = rdd.mapValues(_ > 4) 
    indChk.checkpoint 
    
    // indChk.toDebugString 
    // (8) MapPartitionsRDD[11] at mapValues at <console>:24 [] 
    // | ShuffledRDD[3] at reduceByKey at <console>:21 [] 
    // +-(8) MapPartitionsRDD[2] at map at <console>:21 [] 
    //  | ParallelCollectionRDD[1] at parallelize at <console>:21 [] 
    
    indChk.count 
    // 3 
    
    indChk.toDebugString 
    // (8) MapPartitionsRDD[11] at mapValues at <console>:24 [] 
    // | ReliableCheckpointRDD[12] at count at <console>:27 [] 
    

Come si può vedere nel primo caso l ineage viene mantenuto anche se i dati vengono recuperati dalla cache. Significa che i dati possono essere ricalcolati da zero se alcune partizioni di indCache vengono perse. Nel secondo caso il lignaggio viene completamente perso dopo il checkpoint e indChk non contiene più le informazioni necessarie per ricostruirlo.

checkpoint, a differenza di cache/persist viene calcolato separatamente dagli altri lavori. Ecco perché RDD contrassegnato per checkpoint dovrebbero essere memorizzate nella cache:

è fortemente raccomandato che questo RDD viene mantenuto in memoria, altrimenti salvandolo su un file richiederà ricalcolo.

Infine i dati checkpointed sono persistenti e non vengono rimossi dopo che SparkContext è stato eliminato.

Per la memorizzazione dei dati SparkContext.setCheckpointDir utilizzato da RDD.checkpoint richiede il percorso DFS se in esecuzione in modalità non locale. Altrimenti può essere anche un file system locale. localCheckpoint e persist senza replica dovrebbe utilizzare il file system locale.

Nota:

RDD checkpoint è un concetto diverso da un chekpointing in Spark Streaming. Il primo è stato progettato per risolvere il problema del lignaggio, il secondo riguarda l'affidabilità dello streaming e il ripristino degli errori.

1

Se si seleziona the relevant part of the documentation, si parla di scrittura di dati su un sistema affidabile, ad es. HDFS. Ma spetta a te dire ad Apache Spark dove scrivere le sue informazioni sul checkpoint.

D'altra parte, persistente riguarda la memorizzazione nella cache dei dati principalmente in memoria, come indica chiaramente this part of the documentation.

Quindi, dipende dalla directory che hai fornito ad Apache Spark.

+1

La persistenza nello streaming è un problema piuttosto diverso e non è strettamente correlata al caching. – zero323

12

Penso che si possa trovare una risposta molto dettagliata here

Mentre è molto difficile riassumere tutto in quella pagina, dirò

Persist

  • persista o caching con StorageLevel.DISK_ONLY causa che la generazione di RDD sia calcolata e memorizzata in una posizione tale che l'uso successivo di tale RDD non andrà oltre quei punti nel ricalcolare la linea.
  • Dopo la chiamata permanente, Spark ricorda ancora il lignaggio del RDD anche se non lo chiama.
  • In secondo luogo, dopo l'applicazione termina, la cache viene cancellata o il file distrutto

Checkpoint

  • negozi checkpoint RDD fisicamente HDFS e distrugge il lignaggio che lo ha creato.
  • Il file del checkpoint non verrà eliminato anche dopo la chiusura dell'applicazione Spark.
  • I file del punto di controllo possono essere utilizzati nella successiva esecuzione o nel programma del driver
  • Il checkpoint di un RDD causa un doppio calcolo perché l'operazione prima chiamerà una cache prima di eseguire il lavoro effettivo di elaborazione e scrittura nella directory del punto di controllo.

Si consiglia di leggere l'articolo per ulteriori dettagli o interni delle operazioni di checkpoint o cache di Spark.

1
  1. Persistenza (MEMORY_AND_DISK) memorizzerà il frame di dati sul disco e memoria temporanea senza rompere il lignaggio del programma cioè df.rdd.toDebugString() restituirà la stessa uscita. Si raccomanda di utilizzare persistono (*) su un calcolo, che sta per essere riutilizzati per evitare ricalcolo dei risultati intermedi:

    df = df.persist(StorageLevel.MEMORY_AND_DISK) 
    calculation1(df) 
    calculation2(df) 
    

    nota, che cache il frame di dati non garantisce, che rimarrà nella memoria fino a quando non lo chiami la prossima volta. A seconda dell'uso della memoria, la cache può essere scartata.

  2. checkpoint(), d'altra parte, interrompe il lignaggio e forza la memorizzazione dei dati sul disco. A differenza dell'uso di cache()/persist(), il controllo frequente può rallentare il programma. Si consiglia di utilizzare i checkpoint quando a) si lavora in un ambiente instabile per consentire il ripristino rapido dagli errori b) memorizzare gli stati intermedi di calcolo quando nuove voci del RDD dipendono dalle voci precedenti, ovvero evitare di ricalcolare una catena di dipendenze lunga in caso di errore

Problemi correlati