2016-06-10 20 views
7

EsperimentoSpark SQL: Perché due processi per una query?

ho provato il seguente frammento su Spark 1.6.1.

val soDF = sqlContext.read.parquet("/batchPoC/saleOrder") # This has 45 files 
soDF.registerTempTable("so") 
sqlContext.sql("select dpHour, count(*) as cnt from so group by dpHour order by cnt").write.parquet("/out/") 

Il Physical Plan è:

== Physical Plan == 
Sort [cnt#59L ASC], true, 0 
+- ConvertToUnsafe 
    +- Exchange rangepartitioning(cnt#59L ASC,200), None 
     +- ConvertToSafe 
     +- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Final,isDistinct=false)], output=[dpHour#38,cnt#59L]) 
      +- TungstenExchange hashpartitioning(dpHour#38,200), None 
       +- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Partial,isDistinct=false)], output=[dpHour#38,count#63L]) 
        +- Scan ParquetRelation[dpHour#38] InputPaths: hdfs://hdfsNode:8020/batchPoC/saleOrder 

Per questa query, ho avuto due lavori: Job 9 e Job 10 enter image description here

Per Job 9, il DAG è:

enter image description here

Per Job 10, il DAG è:

enter image description here

Osservazioni

  1. A quanto pare, ci sono due jobs per una query.
  2. Stage-16 (contrassegnato come Stage-14 in Job 9) viene saltato in Job 10.
  3. Stage-15 ultimo RDD[48], è l'analogo a Stage-17 ultimo RDD[49]. Come? Ho visto nei registri che dopo Stage-15 esecuzione, la RDD[48] è registrato come RDD[49]
  4. Stage-17 è indicata nella driver-logs, ma non ha mai avuto eseguiti a Executors. Su driver-logs viene visualizzata l'esecuzione dell'attività, ma quando ho guardato i registri del contenitore Yarn, non è stato riscontrato alcun ricevimento di task da Stage-17.

I registri che supportano queste osservazioni (solo driver-logs, ho perso i registri executor a causa di un arresto successivo). Si è visto che prima di Stage-17 inizia, RDD[49] è registrato:

16/06/10 22:11:22 INFO TaskSetManager: Finished task 196.0 in stage 15.0 (TID 1121) in 21 ms on slave-1 (199/200) 
16/06/10 22:11:22 INFO TaskSetManager: Finished task 198.0 in stage 15.0 (TID 1123) in 20 ms on slave-1 (200/200) 
16/06/10 22:11:22 INFO YarnScheduler: Removed TaskSet 15.0, whose tasks have all completed, from pool 
16/06/10 22:11:22 INFO DAGScheduler: ResultStage 15 (parquet at <console>:26) finished in 0.505 s 
16/06/10 22:11:22 INFO DAGScheduler: Job 9 finished: parquet at <console>:26, took 5.054011 s 
16/06/10 22:11:22 INFO ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter 
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 
16/06/10 22:11:22 INFO DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 
16/06/10 22:11:22 INFO SparkContext: Starting job: parquet at <console>:26 
16/06/10 22:11:22 INFO DAGScheduler: Registering RDD 49 (parquet at <console>:26) 
16/06/10 22:11:22 INFO DAGScheduler: Got job 10 (parquet at <console>:26) with 25 output partitions 
16/06/10 22:11:22 INFO DAGScheduler: Final stage: ResultStage 18 (parquet at <console>:26) 
16/06/10 22:11:22 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 17) 
16/06/10 22:11:22 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 17) 
16/06/10 22:11:22 INFO DAGScheduler: Submitting ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26), which has no missing parents 
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 17.4 KB, free 512.3 KB) 
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 8.9 KB, free 521.2 KB) 
16/06/10 22:11:22 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 172.16.20.57:44944 (size: 8.9 KB, free: 517.3 MB) 
16/06/10 22:11:22 INFO SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006 
16/06/10 22:11:22 INFO DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26) 
16/06/10 22:11:22 INFO YarnScheduler: Adding task set 17.0 with 200 tasks 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 1125, slave-1, partition 0,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 1.0 in stage 17.0 (TID 1126, slave-2, partition 1,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 2.0 in stage 17.0 (TID 1127, slave-1, partition 2,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 3.0 in stage 17.0 (TID 1128, slave-2, partition 3,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 4.0 in stage 17.0 (TID 1129, slave-1, partition 4,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 5.0 in stage 17.0 (TID 1130, slave-2, partition 5,NODE_LOCAL, 1988 bytes) 

Domande

  1. Perché due Jobs? Qual è l'intenzione qui rompendo uno DAG in due jobs?
  2. Job 10 's DAG sguardi completo per l'esecuzione della query. C'è qualcosa di specifico Job 9 sta facendo?
  3. Perché Stage-17 non viene saltato? Sembra che vengano creati i dummy tasks, hanno qualche scopo.
  4. Successivamente, ho provato un'altra query piuttosto semplice.Inaspettatamente, stava creando 3 Jobs.

    sqlContext.sql ("select dpHour da così fine da dphour"). Write.parquet ("/ OUT2 /")

risposta

4

Quando si utilizza le API di alto livello dataframe/set di dati, è lasciarlo a Spark per determinare il piano di esecuzione, incluso il lavoro/stage chunking. Questi dipendono da molti fattori come il parallelismo di esecuzione, le strutture di dati cache/persistenti, ecc. Nelle versioni future di Spark, con l'aumentare delle sofisticazione dell'ottimizzatore, è possibile visualizzare ancora più lavori per query, ad esempio, alcune origini dati sono campionate per parametrizzare ottimizzazione dell'esecuzione basata sui costi.

Ad esempio, ho visto spesso, ma non sempre, visto che la scrittura genera processi separati dall'elaborazione che coinvolge lo shuffle.

In conclusione, se si utilizzano le API di alto livello, a meno che non si debba eseguire un'ottimizzazione estremamente dettagliata con enormi volumi di dati, raramente si paga per scavare nello specifico chunking. I costi di avvio del lavoro sono estremamente bassi rispetto all'elaborazione/all'output.

Se, invece, siete curiosi di conoscere gli interni di Spark, leggete il codice di ottimizzazione e contattate la mailing list degli sviluppatori di Spark.

+1

Questo è curioso, perché le seconde fasi di lavoro non possono essere nel primo lavoro? –

+1

Buona domanda. Potrebbe avere a che fare con la generazione di risultati intermedi. La domanda importante è: perché importa come un DAG è mappato a stage e lavori? – Sim

+1

Sì, è difficile capire veramente come sta facendo Spark, un mix di risorse disponibili, dati .... –