2016-06-11 15 views
12

Mam robotę iskrową, która pobiera plik z 8 rekordami z hdfs, wykonuje prostą agregację i zapisuje ją z powrotem do Hadoop. Zauważam, że są setki zadań, kiedy to robię.Dlaczego tak wiele zadań w mojej pracy iskier?

Nie jestem również pewien, dlaczego istnieje wiele zadań do tego? Myślałem, że praca była bardziej podobna do akcji. Mogę spekulować, dlaczego, ale rozumiem, że wewnątrz tego kodu powinno to być jedno zlecenie, które powinno być podzielone na etapy, a nie wiele zadań. Dlaczego nie dzieli się go na etapy, jak to się dzieje w pracy?

Jeśli chodzi o zadania o wielkości 200 plus, ponieważ ilość danych i liczba węzłów jest niewielka, nie ma sensu, aby było 25 zadań dla każdego wiersza danych, gdy istnieje tylko jedna agregacja i kilka filtrów. Dlaczego nie miałoby to jednego zadania na partycję na operację atomową?

Oto odpowiedni kod Scala -

import org.apache.spark.sql._ 
import org.apache.spark.sql.types._ 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

object TestProj {object TestProj { 
    def main(args: Array[String]) { 

    /* set the application name in the SparkConf object */ 
    val appConf = new SparkConf().setAppName("Test Proj") 

    /* env settings that I don't need to set in REPL*/ 
    val sc = new SparkContext(appConf) 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 

    val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt") 

    /*the below rdd will have schema defined in Record class*/ 
    val rddCase = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt") 
         .map(x=>x.split(" ")) //file record into array of strings based spaces 
         .map(x=>Record(
            x(0).toInt, 
            x(1).asInstanceOf[String], 
            x(2).asInstanceOf[String], 
            x(3).toInt 
            )) 


    /* the below dataframe groups on first letter of first name and counts it*/ 
    val aggDF = rddCase.toDF() 
         .groupBy($"firstName".substr(1,1).alias("firstLetter")) 
         .count 
         .orderBy($"firstLetter") 

    /* save to hdfs*/ 
    aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg") 

    } 

    case class Record(id: Int 
        , firstName: String 
        , lastName: String 
        , quantity:Int) 

} 

Poniżej znajduje się zrzut ekranu po kliknięciu w aplikacji enter image description here

Poniżej są etapy pokazać podczas przeglądania konkretnej „pracy” z identyfikatorem 0 enter image description here

Poniżej znajduje się pierwsza część ekranu po kliknięciu na scenie z ponad 200 zadaniami

enter image description here

Jest to druga część ekranu wewnątrz etapie enter image description here

Poniżej znajduje się po kliknięciu na „wykonawców” Zakładka enter image description here

Zgodnie z życzeniem, tutaj są etapy Job ID 1

enter image description here

Oto th Szczegóły E dla etapu ID pracą 1 z 200 zadań

enter image description here

Odpowiedz

17

Jest to klasyczny Spark pytanie.

Dwa zadania używane do odczytu (Stage ID 0 na drugiej ilustracji) to ustawienie defaultMinPartitions, które jest ustawione na 2. Możesz uzyskać ten parametr, odczytując wartość w REPL sc.defaultMinPartitions. Powinien być również widoczny w interfejsie Spark pod kranem "Środowisko".

Możesz spojrzeć na code z github, aby zobaczyć, co to dokładnie dzieje się. Jeśli chcesz, aby więcej partycji było używanych do odczytu, po prostu dodaj go jako parametr, np. sc.textFile("a.txt", 20).

Teraz interesująca część pochodzi z 200 partycji, które pojawiają się na drugim etapie (Etap 1 na drugiej figurze). Cóż, za każdym razem, gdy następuje przetasowanie, Spark musi zdecydować, ile partycji będzie miało przetasowanie RDD. Jak możesz sobie wyobrazić, domyślną wartością jest 200.

Można to zmienić za pomocą:

sqlContext.setConf("spark.sql.shuffle.partitions", "4”) 

przypadku uruchomienia kodu z tej konfiguracji będzie można zauważyć, że 200 partycje nie są tam będzie więcej. Jak ustawić ten parametr jest rodzajem sztuki. Może wybierz 2x liczbę rdzeni, które masz (lub cokolwiek).

Sądzę, że Spark 2.0 ma sposób automatycznego określenia najlepszej liczby partycji dla losowych RDD. Nie mogę się doczekać!

Ostatecznie liczba otrzymywanych zadań ma związek z liczbą wykonanych zoptymalizowanych kodów DataFrame . Jeśli czytasz specyfikację Spark, mówi, że każda akcja RDD wyzwala jedno zadanie. Gdy działanie obejmuje ramkę danych lub SparkSQL, optymalizator Catalyst opracuje plan wykonania i wygeneruje kod oparty na RDD w celu jego wykonania. Trudno powiedzieć dokładnie, dlaczego używa dwóch akcji w twojej sprawie. Być może trzeba spojrzeć na zoptymalizowany plan kwerend, aby zobaczyć dokładnie, co robi.

+0

Dzięki człowiek! Zrobię to natychmiast, aby to sprawdzić. A co z wieloma zleceniami? Dlaczego są dwie prace? –

+1

Czy masz ekran dla etapów Job ID 1? – marios

+0

Dodałem je do OP –

1

Mam podobny problem. Ale w moim scenariuszu zbierana przeze mnie kolekcja ma mniej elementów niż liczba zadań zaplanowanych przez Spark (co powoduje, że iskra czasami zachowuje się dziwnie). Używając wymuszonego numeru partycji, udało mi się naprawić ten problem.

To było coś takiego:

collection = range(10) # In the real scenario it was a complex collection 
sc.parallelize(collection).map(lambda e: e + 1) # also a more complex operation in the real scenario 

Potem ujrzałem w dzienniku Spark:

INFO YarnClusterScheduler: Adding task set 0.0 with 512 tasks 
Powiązane problemy