Sto lavorando a un progetto di streaming Scala (2.11)/Spark (1.6.1) e utilizzando mapWithState()
per tenere traccia dei dati visti dai batch precedenti.Spark Streaming mapWithState sembra ricostruire periodicamente lo stato completo
Lo stato è distribuito in 20 partizioni su più nodi, creato con StateSpec.function(trackStateFunc _).numPartitions(20)
. In questo stato abbiamo solo pochi tasti (~ 100) mappati a Sets
con circa ~ 160.000 voci, che crescono in tutta l'applicazione. L'intero stato è pari a 3GB
, che può essere gestito da ciascun nodo nel cluster. In ciascun batch, alcuni dati vengono aggiunti a uno stato ma non vengono eliminati fino alla fine del processo, ovvero ~ 15 minuti.
Mentre si segue l'interfaccia utente dell'applicazione, il tempo di elaborazione di ogni 10 batch è molto elevato rispetto agli altri batch. Vedere immagini:
I campi gialli rappresentano il massimo storico di lavorazione.
Una vista più dettagliata del lavoro mostra che in questi lotti verificarsi ad un certo punto, esattamente quando tutti i 20 partizioni sono "saltati". O questo è quello che dice l'interfaccia utente.
mia comprensione di skipped
è che ogni partizione Stato è uno dei possibili compiti che non viene eseguito, in quanto non ha bisogno di essere ricalcolato. Tuttavia, non capisco perché la quantità di skips
varia in ogni Job e perché l'ultimo Job richiede così tante elaborazioni. Il tempo di elaborazione più elevato si verifica indipendentemente dalle dimensioni dello stato, influisce solo sulla durata.
Si tratta di un bug nella funzionalità mapWithState()
o è questo comportamento previsto? La struttura dati sottostante richiede un qualche tipo di rimpasto, lo Set
nello stato deve copiare i dati? O è più probabile che sia un difetto nella mia applicazione?
Nel mio caso, posso vedere che ogni lavoro è controllato in il lotto. Perché non solo l'ultimo lavoro? Qual è la soluzione per tenere d'occhio le dimensioni dello stato? Per essere in grado di ottimizzarlo. – crak
@crak Qual è il tuo intervallo di checkpoint? E come vedi che ogni posto di lavoro controlla i dati? –
Ogni 10 lotti. Il mio occhio era un abuso, ho 12 lavori su 16 che fanno checkpoint. Ed è logico, ho 12 mapWithState, posso vedere lì l'impronta in spark ui. Ma senza sapere quale ha più dimensioni. mapWithState store semplicemente non è come il precedente impianto? – crak