2016-06-10 9 views
7

EksperymentSpark SQL: Dlaczego dwa zadania dla jednego zapytania?

Próbowałem następujący fragment na Spark 1.6.1.

val soDF = sqlContext.read.parquet("/batchPoC/saleOrder") # This has 45 files 
soDF.registerTempTable("so") 
sqlContext.sql("select dpHour, count(*) as cnt from so group by dpHour order by cnt").write.parquet("/out/") 

Physical Plan jest:

== Physical Plan == 
Sort [cnt#59L ASC], true, 0 
+- ConvertToUnsafe 
    +- Exchange rangepartitioning(cnt#59L ASC,200), None 
     +- ConvertToSafe 
     +- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Final,isDistinct=false)], output=[dpHour#38,cnt#59L]) 
      +- TungstenExchange hashpartitioning(dpHour#38,200), None 
       +- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Partial,isDistinct=false)], output=[dpHour#38,count#63L]) 
        +- Scan ParquetRelation[dpHour#38] InputPaths: hdfs://hdfsNode:8020/batchPoC/saleOrder 

dla tego zapytania, mam dwa zadania: Job 9 i Job 10 enter image description here

Dla Job 9 The DAG jest:

enter image description here

Dla Job 10 The DAG jest:

enter image description here

Obserwacje

  1. Podobno istnieją dwa jobs dla jednego zapytania.
  2. Stage-16 (oznaczone jako Stage-14 w Job 9) jest pomijane w Job 10.
  3. Stage-15 's ostatni RDD[48], jest taki sam jak Stage-17 z ostatnich RDD[49]. How? Widziałem w dziennikach, że po Stage-15 egzekucji, RDD[48] jest zarejestrowany jako RDD[49]
  4. Stage-17 jest pokazane w driver-logs ale nigdy nie zrealizowanych na Executors. Na driver-logs pokazano wykonanie zadania, ale gdy przyjrzałem się dziennikom kontenera Yarn, nie było żadnych dowodów na to, że otrzymano task z Stage-17.

Logi wspierające te obserwacje (tylko driver-logs, Zgubiłem executor logi z powodu późniejszej awarii). Okazuje się, że przed rozpoczęciem Stage-17, RDD[49] jest zarejestrowany:

16/06/10 22:11:22 INFO TaskSetManager: Finished task 196.0 in stage 15.0 (TID 1121) in 21 ms on slave-1 (199/200) 
16/06/10 22:11:22 INFO TaskSetManager: Finished task 198.0 in stage 15.0 (TID 1123) in 20 ms on slave-1 (200/200) 
16/06/10 22:11:22 INFO YarnScheduler: Removed TaskSet 15.0, whose tasks have all completed, from pool 
16/06/10 22:11:22 INFO DAGScheduler: ResultStage 15 (parquet at <console>:26) finished in 0.505 s 
16/06/10 22:11:22 INFO DAGScheduler: Job 9 finished: parquet at <console>:26, took 5.054011 s 
16/06/10 22:11:22 INFO ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter 
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 
16/06/10 22:11:22 INFO DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1 
16/06/10 22:11:22 INFO SparkContext: Starting job: parquet at <console>:26 
16/06/10 22:11:22 INFO DAGScheduler: Registering RDD 49 (parquet at <console>:26) 
16/06/10 22:11:22 INFO DAGScheduler: Got job 10 (parquet at <console>:26) with 25 output partitions 
16/06/10 22:11:22 INFO DAGScheduler: Final stage: ResultStage 18 (parquet at <console>:26) 
16/06/10 22:11:22 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 17) 
16/06/10 22:11:22 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 17) 
16/06/10 22:11:22 INFO DAGScheduler: Submitting ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26), which has no missing parents 
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 17.4 KB, free 512.3 KB) 
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 8.9 KB, free 521.2 KB) 
16/06/10 22:11:22 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 172.16.20.57:44944 (size: 8.9 KB, free: 517.3 MB) 
16/06/10 22:11:22 INFO SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006 
16/06/10 22:11:22 INFO DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26) 
16/06/10 22:11:22 INFO YarnScheduler: Adding task set 17.0 with 200 tasks 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 1125, slave-1, partition 0,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 1.0 in stage 17.0 (TID 1126, slave-2, partition 1,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 2.0 in stage 17.0 (TID 1127, slave-1, partition 2,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 3.0 in stage 17.0 (TID 1128, slave-2, partition 3,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 4.0 in stage 17.0 (TID 1129, slave-1, partition 4,NODE_LOCAL, 1988 bytes) 
16/06/10 22:11:23 INFO TaskSetManager: Starting task 5.0 in stage 17.0 (TID 1130, slave-2, partition 5,NODE_LOCAL, 1988 bytes) 

pytania

  1. Dlaczego dwa Jobs? Jaki jest zamysł, przełamując DAG na dwa jobs?
  2. Job 10 's DAG wygląda kompletny do wykonania kwerendy. Czy robi coś konkretnego Job 9?
  3. Dlaczego Stage-17 nie jest pomijany? Wygląda na to, że tworzone są manekiny tasks, czy mają jakiś cel.
  4. Później próbowałem jeszcze prostszego zapytania.Niespodziewanie tworzył on 3 Jobs.

    sqlContext.sql ("wybierz dpHour z tak aby przez dphour"). Write.parquet ("/ OUT2 /")

Odpowiedz

4

Kiedy używasz wysokim poziomie dataframe/zestawu danych API, ty pozostaw to Sparkowi do określenia planu wykonania, w tym chrupania zadania/etapu. Zależą one od wielu czynników, takich jak równoległość wykonywania, buforowane/utrwalone struktury danych itp. W przyszłych wersjach Sparka, wraz ze wzrostem optymalizacji optymalizatora, można zobaczyć jeszcze więcej zadań na jedno zapytanie, ponieważ na przykład niektóre źródła danych są próbkowane w celu sparametryzowania optymalizacja wykonania kosztowego.

Na przykład często, ale nie zawsze, piszę generować oddzielne zadania z przetwarzania obejmującego tasowanie.

Podsumowując, jeśli korzystasz z interfejsów API wysokiego poziomu, o ile nie musisz wykonywać bardzo szczegółowej optymalizacji przy dużych wolumenach danych, rzadko się opłaca zagłębiać w konkretne fragmenty. Koszty uruchamiania zadań są bardzo niskie w porównaniu do przetwarzania/produkcji.

Jeśli z drugiej strony jesteś ciekawy wnętrza internatu, przeczytaj kod optymalizatora i wejdź na listę dyskusyjną programisty Spark.

+1

To ciekawe, dlaczego drugie etapy pracy nie mogą znaleźć się w pierwszej pracy? –

+1

Dobre pytanie. Może to mieć związek z generowaniem wyniku pośredniego. Ważne pytanie brzmi: dlaczego ma znaczenie, w jaki sposób DAG jest mapowane na etapy i miejsca pracy? – Sim

+1

Tak, trudno jest naprawdę zrozumieć, w jaki sposób Spark to robi, połączenie dostępnych zasobów, danych .... –

Powiązane problemy