2015-11-29 17 views
7

Sto valutando Spark SQL per implementare un semplice modulo di reporting (alcune semplici aggregazioni su dati Avro già archiviati su HDFS). Non ho dubbi sul fatto che Spark SQL possa adattarsi bene sia ai miei requisiti funzionali che a quelli non funzionali.Come velocizzare i test delle unità di Spark SQL?

Tuttavia, oltre ai requisiti di produzione, desidero assicurarmi che il modulo sia testabile. Seguiamo un approccio BDD con scenari molto mirati, il che significa che questo modulo richiederà di eseguire decine/centinaia di query SQL su alcuni dati molto semplici (1..10 record).

Per avere un'idea approssimativa delle prestazioni posso aspettarmi da Spark SQL in modalità locale, ho subito realizzato il prototipo alcuni test:

  1. select count(*) from myTable
  2. select key, count(*) from myTable group by key

Il primo test richiede in media 100ms, ma il secondo richiede 500ms. Tale prestazione è inaccettabile, ciò renderebbe la suite di test troppo lenta.

Per confronto, posso eseguire lo stesso test in 10 ms utilizzando Crunch e la sua MemPipeline (1500ms con MRPipeline in modalità locale) e anche 1500ms con Hive in modalità incorporata. Spark SQL è quindi un po 'più veloce di MR in modalità locale, ma è comunque un modo per rallentare la creazione di buone suite di test.

È possibile velocizzare Spark SQL in modalità locale?

Esiste un modo migliore/più veloce per testare un modulo Spark SQL?

(non ho ancora profilata l'esecuzione, ma dal momento che un groupBy().countByKey() su un RDD prende 40ms, in media, mi aspetto di scoprire che il colpevole è l'ottimizzatore di query)


mio rapido & codice di prova sporca segue:

SparkConf sparkConf = new SparkConf() 
       .setMaster("local[4]") 
       .setAppName("poc-sparksql"); 

    try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) { 
     SQLContext sqlCtx = new SQLContext(ctx); 

     for (int i = 0; i < ITERATIONS; i++) { 
      Stopwatch testCaseSw = new Stopwatch().start(); 

      DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro"); 
      df.registerTempTable("myTable"); 
      DataFrame result = sqlCtx.sql("select count(*) from myTable"); 

      System.out.println("Results: " + result.collectAsList()); 
      System.out.println("Elapsed: " + testCaseSw.elapsedMillis()); 
     } 

     for (int i = 0; i < ITERATIONS; i++) { 
      Stopwatch testCaseSw = new Stopwatch().start(); 

      DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro"); 
      df.registerTempTable("myTable"); 
      DataFrame result = sqlCtx.sql("select a, count(*) from myTable group by a "); 

      System.out.println("Results: " + result.collectAsList()); 
      System.out.println("Elapsed: " + testCaseSw.elapsedMillis()); 
     } 
} 
+0

Avete considerato la cache? – eliasah

+0

Se si stanno testando diverse query sugli stessi dati, caricare i dati una volta .. e quindi eseguire una query. –

+0

Secondo i miei test la cache non aiuta (la chiamata sql è la cosa lenta). Stavo pensando più a qualcosa come essere in grado di disabilitare alcune ottimizzazioni. Non vedo la cache come soluzione perché 1- Un test "buono" ha il suo input progettato per facilitare la comprensione di un dato comportamento, quindi ogni test ha input diversi.Il codice sciatto che ho fornito non tenta di imitare quello che farebbe una suite di test (serializzazione automatica di Avro di una tabella di cetriolini ecc.). 2- Le query SQL essendo deterministiche se l'input fosse sempre lo stesso, avevo cache dei dati di output raccolti anziché input –

risposta

0

Se si sta osservando l'ottimizzazione del livello ms ci sono vari indicatori.

  1. Leggere i dati una volta e memorizzare nella cache e solo la query SQL su di esso più volte. all'interno del carico del circuito significa "generare nuova operazione nella everyIteartion"
DataFrame df = sqlCtx.load("/tmp/test.avro","com.databricks.spark.avro"); 
df.registerTempTable("myTable"); 
df.cache() 

for (int i = 0; i < ITERATIONS; i++) { 
     Stopwatch testCaseSw = new Stopwatch().start(); 
     DataFrame result = sqlCtx.sql("select count(*) from myTable"); 
     // Dont do printLn inside the loop , save the output in some hashMap and print it later once the loop is complete 
     System.out.println("Results: " + result.collectAsList()); 
     System.out.println("Elapsed: " + testCaseSw.elapsedMillis()); 
} 
  1. estratto fuori System.out.println all'esterno del ciclo come consumare un certo tempo.

prega di dare un'occhiata: http://bytepadding.com/big-data/spark/understanding-spark-through-map-reduce/

Problemi correlati