Sulla base dei nostri esperimenti, vediamo che i costi di elaborazione interna dello Spark Streaming di stato richiedono una quantità significativa di tempo quando lo stato diventa più di un milione di oggetti. Di conseguenza, la latenza soffre, perché dobbiamo aumentare l'intervallo di batch per evitare comportamenti instabili (tempo di elaborazione> intervallo batch).Spark Streaming: perché i costi di elaborazione interni sono così elevati da gestire lo stato dell'utente di pochi MB?
Non ha nulla a che vedere con le specifiche della nostra app, poiché può essere riprodotto dal codice seguente.
Quali sono esattamente i costi di elaborazione/infrastruttura interni di Spark che richiedono così tanto tempo per gestire lo stato dell'utente? Esistono opzioni per ridurre i tempi di elaborazione oltre ad aumentare semplicemente l'intervallo di batch?
Abbiamo pianificato di utilizzare lo stato in modo esteso: almeno 100 MB o giù di lì su ciascuno di alcuni nodi per conservare tutti i dati in memoria e scaricarli solo una volta ogni ora.
L'aumento dell'intervallo batch aiuta, ma vogliamo mantenere l'intervallo batch minimo.
Probabilmente il motivo non è lo spazio occupato dallo stato, ma un grafico di oggetto piuttosto grande, perché quando abbiamo modificato l'elenco in una vasta serie di primitivi, il problema è scomparso.
Solo un'ipotesi: potrebbe avere a che fare con lo org.apache.spark.util.SizeEstimator
utilizzato internamente da Spark, perché viene visualizzato di tanto in tanto durante la creazione del profilo.
è semplice demo per riprodurre l'immagine in alto a iCore7 moderna:
- meno di 15 MB di stato
- nessun input flusso affatto
- più rapido possibile (fittizio) Funzione 'updateStateByKey'
- intervallo batch 1 secondo
- checkp mista (richiesto da Spark, deve avere) sul disco locale
- testato sia localmente che su FILATO
Codice:
package spark;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class SlowSparkStreamingUpdateStateDemo {
// Very simple state model
static class State implements Serializable {
final List<String> data;
State(List<String> data) {
this.data = data;
}
}
public static void main(String[] args) {
SparkConf conf = new SparkConf()
// Tried KryoSerializer, but it does not seem to help much
//.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setMaster("local[*]")
.setAppName(SlowSparkStreamingUpdateStateDemo.class.getName());
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(1));
javaStreamingContext.checkpoint("checkpoint"); // a must (if you have stateful operation)
List<Tuple2<String, State>> initialRddGeneratedData = prepareInitialRddData();
System.out.println("Estimated size, bytes: " + SizeEstimator.estimate(initialRddGeneratedData));
JavaPairRDD<String, State> initialRdd = javaStreamingContext.sparkContext().parallelizePairs(initialRddGeneratedData);
JavaPairDStream<String, State> stream = javaStreamingContext
.textFileStream(".") // fake: effectively, no input at all
.mapToPair(input -> (Tuple2<String, State>) null) // fake to get JavaPairDStream
.updateStateByKey(
(inputs, maybeState) -> maybeState, // simplest possible dummy function
new HashPartitioner(javaStreamingContext.sparkContext().defaultParallelism()),
initialRdd); // set generated state
stream.foreachRDD(rdd -> { // simplest possible action (required by Spark)
System.out.println("Is empty: " + rdd.isEmpty());
return null;
});
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
private static List<Tuple2<String, State>> prepareInitialRddData() {
// 'stateCount' tuples with value = list of size 'dataListSize' of strings of length 'elementDataSize'
int stateCount = 1000;
int dataListSize = 200;
int elementDataSize = 10;
List<Tuple2<String, State>> initialRddInput = new ArrayList<>(stateCount);
for (int stateIdx = 0; stateIdx < stateCount; stateIdx++) {
List<String> stateData = new ArrayList<>(dataListSize);
for (int dataIdx = 0; dataIdx < dataListSize; dataIdx++) {
stateData.add(RandomStringUtils.randomAlphanumeric(elementDataSize));
}
initialRddInput.add(new Tuple2<>("state" + stateIdx, new State(stateData)));
}
return initialRddInput;
}
}
Hai provato a porre la domanda nella mailing list? – eliasah
@eliasah Ho [duplicato] (http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Why-internal-processing-costs-are-so-high-to-handle -user-state-of-a-few-MB-td24640.html) lì ieri –
Hai fatto un benchmark su diversi serializzatori? – eliasah