2016-03-16 15 views
14

Sto lavorando a un progetto di streaming Scala (2.11)/Spark (1.6.1) e utilizzando mapWithState() per tenere traccia dei dati visti dai batch precedenti.Spark Streaming mapWithState sembra ricostruire periodicamente lo stato completo

Lo stato è distribuito in 20 partizioni su più nodi, creato con StateSpec.function(trackStateFunc _).numPartitions(20). In questo stato abbiamo solo pochi tasti (~ 100) mappati a Sets con circa ~ 160.000 voci, che crescono in tutta l'applicazione. L'intero stato è pari a 3GB, che può essere gestito da ciascun nodo nel cluster. In ciascun batch, alcuni dati vengono aggiunti a uno stato ma non vengono eliminati fino alla fine del processo, ovvero ~ 15 minuti.

Mentre si segue l'interfaccia utente dell'applicazione, il tempo di elaborazione di ogni 10 batch è molto elevato rispetto agli altri batch. Vedere immagini:

The spikes show the higher processing time.

I campi gialli rappresentano il massimo storico di lavorazione.

enter image description here

Una vista più dettagliata del lavoro mostra che in questi lotti verificarsi ad un certo punto, esattamente quando tutti i 20 partizioni sono "saltati". O questo è quello che dice l'interfaccia utente.

enter image description here

mia comprensione di skipped è che ogni partizione Stato è uno dei possibili compiti che non viene eseguito, in quanto non ha bisogno di essere ricalcolato. Tuttavia, non capisco perché la quantità di skips varia in ogni Job e perché l'ultimo Job richiede così tante elaborazioni. Il tempo di elaborazione più elevato si verifica indipendentemente dalle dimensioni dello stato, influisce solo sulla durata.

Si tratta di un bug nella funzionalità mapWithState() o è questo comportamento previsto? La struttura dati sottostante richiede un qualche tipo di rimpasto, lo Set nello stato deve copiare i dati? O è più probabile che sia un difetto nella mia applicazione?

risposta

9

Si tratta di un bug nella funzionalità mapWithState() o è inteso come comportamento ?

Questo comportamento è previsto. I picchi che stai vedendo sono perché i tuoi dati ricevono il checkpoint alla fine di quel determinato lotto. Se noterai il tempo nei batch più lunghi, vedrai che succede costantemente ogni 100 secondi. Questo perché il tempo del checkpoint è costante e viene calcolato per il tuo batchDuration, che è la frequenza con cui si parla all'origine dati per leggere un batch moltiplicato per alcune costanti, a meno che non si imposti esplicitamente l'intervallo DStream.checkpoint.

Ecco il pezzo rilevante di codice da MapWithStateDStream:

override def initialize(time: Time): Unit = { 
    if (checkpointDuration == null) { 
    checkpointDuration = slideDuration * DEFAULT_CHECKPOINT_DURATION_MULTIPLIER 
    } 
    super.initialize(time) 
} 

Dove DEFAULT_CHECKPOINT_DURATION_MULTIPLIER è:

private[streaming] object InternalMapWithStateDStream { 
    private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10 
} 

Quali allinea esattamente con il comportamento che stai vedendo, dal momento che il periodo di lettura batch è ogni 10 secondi => 10 * 10 = 100 secondi.

Questo è normale, e questo è il costo dello stato persistente con Spark. Un'ottimizzazione al tuo fianco potrebbe essere quella di pensare a come ridurre al minimo la dimensione dello stato che devi conservare in memoria, affinché questa serializzazione sia il più veloce possibile. Inoltre, assicurati che i dati siano distribuiti su un numero sufficiente di esecutori, in modo che lo stato sia distribuito in modo uniforme tra tutti i nodi. Inoltre, spero che tu abbia attivato Kryo Serialization invece della serializzazione Java predefinita, che possa darti un significativo incremento delle prestazioni.

+0

Nel mio caso, posso vedere che ogni lavoro è controllato in il lotto. Perché non solo l'ultimo lavoro? Qual è la soluzione per tenere d'occhio le dimensioni dello stato? Per essere in grado di ottimizzarlo. – crak

+0

@crak Qual è il tuo intervallo di checkpoint? E come vedi che ogni posto di lavoro controlla i dati? –

+0

Ogni 10 lotti. Il mio occhio era un abuso, ho 12 lavori su 16 che fanno checkpoint. Ed è logico, ho 12 mapWithState, posso vedere lì l'impronta in spark ui. Ma senza sapere quale ha più dimensioni. mapWithState store semplicemente non è come il precedente impianto? – crak

1

Oltre alla risposta accettata, che indica il prezzo della serializzazione correlata al checkpoint, esiste un altro problema meno noto che potrebbe contribuire al comportamento di spikey: sfratto degli stati eliminati.

In particolare, gli stati "cancellato" o "scaduto" non vengono immediatamente rimossi dalla mappa, ma sono contrassegnati per la cancellazione e rimossi solo nel processo di serializzazione [in Spark 1.6.1, vedere writeObjectInternal()].

Questo ha due implicazioni di performance, che si verificano solo una volta ogni 10 lotti:

  1. Il processo di attraversamento e la cancellazione ha il suo prezzo
  2. Se si elabora il flusso di scaduta a/eventi cancellati, ad esempio persistere nell'archivio esterno, il costo associato per tutti i 10 lotti verrà pagato solo a questo punto (e non come ci si sarebbe potuto aspettare, su ciascun RDD)
Problemi correlati