2015-09-10 10 views
24

Sulla base dei nostri esperimenti, vediamo che i costi di elaborazione interna dello Spark Streaming di stato richiedono una quantità significativa di tempo quando lo stato diventa più di un milione di oggetti. Di conseguenza, la latenza soffre, perché dobbiamo aumentare l'intervallo di batch per evitare comportamenti instabili (tempo di elaborazione> intervallo batch).Spark Streaming: perché i costi di elaborazione interni sono così elevati da gestire lo stato dell'utente di pochi MB?

Non ha nulla a che vedere con le specifiche della nostra app, poiché può essere riprodotto dal codice seguente.

Quali sono esattamente i costi di elaborazione/infrastruttura interni di Spark che richiedono così tanto tempo per gestire lo stato dell'utente? Esistono opzioni per ridurre i tempi di elaborazione oltre ad aumentare semplicemente l'intervallo di batch?

Abbiamo pianificato di utilizzare lo stato in modo esteso: almeno 100 MB o giù di lì su ciascuno di alcuni nodi per conservare tutti i dati in memoria e scaricarli solo una volta ogni ora.

L'aumento dell'intervallo batch aiuta, ma vogliamo mantenere l'intervallo batch minimo.

Probabilmente il motivo non è lo spazio occupato dallo stato, ma un grafico di oggetto piuttosto grande, perché quando abbiamo modificato l'elenco in una vasta serie di primitivi, il problema è scomparso.

Solo un'ipotesi: potrebbe avere a che fare con lo org.apache.spark.util.SizeEstimator utilizzato internamente da Spark, perché viene visualizzato di tanto in tanto durante la creazione del profilo.

enter image description here

è semplice demo per riprodurre l'immagine in alto a iCore7 moderna:

  • meno di 15 MB di stato
  • nessun input flusso affatto
  • più rapido possibile (fittizio) Funzione 'updateStateByKey'
  • intervallo batch 1 secondo
  • checkp mista (richiesto da Spark, deve avere) sul disco locale
  • testato sia localmente che su FILATO

Codice:

package spark; 

import org.apache.commons.lang3.RandomStringUtils; 
import org.apache.spark.HashPartitioner; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.util.SizeEstimator; 
import scala.Tuple2; 

import java.io.Serializable; 
import java.util.ArrayList; 
import java.util.List; 

public class SlowSparkStreamingUpdateStateDemo { 

    // Very simple state model 
    static class State implements Serializable { 
     final List<String> data; 
     State(List<String> data) { 
      this.data = data; 
     } 
    } 

    public static void main(String[] args) { 
     SparkConf conf = new SparkConf() 
       // Tried KryoSerializer, but it does not seem to help much 
       //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
       .setMaster("local[*]") 
       .setAppName(SlowSparkStreamingUpdateStateDemo.class.getName()); 

     JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(1)); 
     javaStreamingContext.checkpoint("checkpoint"); // a must (if you have stateful operation) 

     List<Tuple2<String, State>> initialRddGeneratedData = prepareInitialRddData(); 
     System.out.println("Estimated size, bytes: " + SizeEstimator.estimate(initialRddGeneratedData)); 
     JavaPairRDD<String, State> initialRdd = javaStreamingContext.sparkContext().parallelizePairs(initialRddGeneratedData); 

     JavaPairDStream<String, State> stream = javaStreamingContext 
       .textFileStream(".") // fake: effectively, no input at all 
       .mapToPair(input -> (Tuple2<String, State>) null) // fake to get JavaPairDStream 
       .updateStateByKey(
         (inputs, maybeState) -> maybeState, // simplest possible dummy function 
         new HashPartitioner(javaStreamingContext.sparkContext().defaultParallelism()), 
         initialRdd); // set generated state 

     stream.foreachRDD(rdd -> { // simplest possible action (required by Spark) 
      System.out.println("Is empty: " + rdd.isEmpty()); 
      return null; 
     }); 

     javaStreamingContext.start(); 
     javaStreamingContext.awaitTermination(); 
    } 

    private static List<Tuple2<String, State>> prepareInitialRddData() { 
     // 'stateCount' tuples with value = list of size 'dataListSize' of strings of length 'elementDataSize' 
     int stateCount = 1000; 
     int dataListSize = 200; 
     int elementDataSize = 10; 
     List<Tuple2<String, State>> initialRddInput = new ArrayList<>(stateCount); 
     for (int stateIdx = 0; stateIdx < stateCount; stateIdx++) { 
      List<String> stateData = new ArrayList<>(dataListSize); 
      for (int dataIdx = 0; dataIdx < dataListSize; dataIdx++) { 
       stateData.add(RandomStringUtils.randomAlphanumeric(elementDataSize)); 
      } 
      initialRddInput.add(new Tuple2<>("state" + stateIdx, new State(stateData))); 
     } 
     return initialRddInput; 
    } 

} 
+1

Hai provato a porre la domanda nella mailing list? – eliasah

+0

@eliasah Ho [duplicato] (http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Why-internal-processing-costs-are-so-high-to-handle -user-state-of-a-few-MB-td24640.html) lì ieri –

+0

Hai fatto un benchmark su diversi serializzatori? – eliasah

risposta

2

gestione dello Stato è stata migliorata nella scintilla 1.6.
fare riferimento a SPARK-2629 Gestione dello stato migliorata per Spark Streaming;

E nella progettazione di dettaglio spec:
svantaggio Improved state management in Spark Streaming

Una performance è metioned come di seguito:

Necessità di gestione dello stato più ottimizzato che non eseguire la scansione ogni chiave updateStateByKey attuale scansione ogni chiave in ogni intervallo di batch, anche se non ci sono dati per quella chiave. Sebbene questa semantica sia utile sono alcuni carichi di lavoro, la maggior parte dei carichi di lavoro richiede solo la scansione e l'aggiornamento dello stato per il quale ci sono nuovi dati. E solo una piccola percentuale di tutto lo stato deve essere toccata per quello in ogni intervallo di batch.The cogroup-based implementation of updateStateByKey is not designed for this; cogroup scans all the keys every time. In fact, this causes the batch processing times of updateStateByKey to increase with the number of keys in the state, even if the data rate stays fixed. enter image description here

Problemi correlati