2014-09-19 7 views
19

Biorąc aplikacji SparkJakie czynniki decydują o liczbie executorów w trybie samodzielnym?

  1. Jakie czynniki decydują liczbę wykonawców w trybie stand alone? W dokumentach Mesos and YARN zgodnie z dokumentami this możemy określić liczbę executorów/rdzeni i pamięci.

  2. Po uruchomieniu kilku executorów. Czy Spark rozpoczyna zadania w sposób okrągły lub jest wystarczająco inteligentny, aby sprawdzić, czy niektóre z executorów są bezczynne/zajęte, a następnie odpowiednio zaplanować zadania.

  3. Ponadto, w jaki sposób Spark decyduje o liczbie zadań? Zrobiłem write prosty program temperatury maksymalnej z małym zestawem danych i Spark zrodził dwa zadania w jednym executorze. Jest to tryb samodzielny Spark.

Odpowiedz

2

Spark wybiera liczbę zadań na podstawie liczby partycji w oryginalnym zbiorze danych. Jeśli używasz HDFS jako źródła danych, to domyślnie liczba partycji jest równa liczbie bloków HDFS. Możesz zmienić liczbę partycji na kilka różnych sposobów. Pierwsze dwa: jako dodatkowy argument do metody SparkContext.textFile; przez wywołanie metody RDD.repartion.

+0

Położyłem plik 5 bloków i zobacz 5 zadań jest Spark. Wygląda na to, że jeden blok tworzy dwie partycje. Dla więcej niż jednego bloku tworzona jest ta sama liczba partycji. –

19

Odpowiadając na Twoje pytania:

  1. Tryb Autonomiczny używa tej samej zmiennej konfiguracji jak Mesos i trybów przędzy ustawić liczbę wykonawców. Zmienna spark.cores.max określa maksymalną liczbę rdzeni używanych w Kontekście iskrowym. Domyślną wartością jest nieskończoność, więc Spark użyje wszystkich rdzeni w klastrze. Zmienna spark.task.cpus określa ile procesorów Spark przydzieli dla jednego zadania, domyślna wartość to 1. Za pomocą tych dwóch zmiennych można zdefiniować maksymalną liczbę równoległych zadań w klastrze.

  2. Po utworzeniu podklasy RDD można zdefiniować, w których komputerach ma działać. Jest to zdefiniowane w metodzie getPreferredLocations. Ale ponieważ sygnatury metod sugerują, że jest to tylko preferencja, więc jeśli Spark wykryje, że jeden komputer nie jest zajęty, uruchomi to zadanie na tej bezczynnej maszynie. Jednak nie znam mechanizmu używanego przez Sparka, aby wiedzieć, jakie maszyny są bezczynne. Aby osiągnąć lokalność, my (Stratio) postanowiliśmy, aby każda część była mniejsza, aby zadanie zajęło mniej czasu i osiągnąć lokalność.

  3. Liczba zadań każdej operacji Sparka jest określona zgodnie z długością partycji RDD. Wektor ten jest wynikiem metody getPartitions, którą należy przesłonić, jeśli chcemy opracować nową podklasę RDD. Ta metoda zwraca, w jaki sposób dzieli się RDD, gdzie znajdują się informacje i partycje. Po dołączeniu dwóch lub więcej RDD przy użyciu, na przykład, operacji łączenia lub łączenia, liczba zadań wynikowego RDD jest maksymalną liczbą zadań RDD zaangażowanych w operację. Na przykład: jeśli dołączysz do RDD1, który ma 100 zadań i RDD2, który ma 1000 zadań, następna operacja wynikowego RDD będzie miała 1000 zadań. Zwróć uwagę, że duża liczba partycji niekoniecznie jest synonimem większej ilości danych.

Mam nadzieję, że to pomoże.

+1

Po ustawieniu parametru num-executors w trybie autonomicznym zawsze używany był jeden executor. –

7

Zgadzam się z @jlopezmat o tym, jak Spark wybiera swoją konfigurację. W odniesieniu do twojego kodu testowego widzisz dwa zadania ze względu na sposób implementacji textFile.Od SparkContext.scala:

/** 
    * Read a text file from HDFS, a local file system (available on all nodes), or any 
    * Hadoop-supported file system URI, and return it as an RDD of Strings. 
    */ 
    def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { 
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 
     minPartitions).map(pair => pair._2.toString) 
    } 

i jeśli możemy sprawdzić, co jest wartością defaultMinPartitions:

/** Default min number of partitions for Hadoop RDDs when not given by user */ 
    def defaultMinPartitions: Int = math.min(defaultParallelism, 2) 
+1

Dzięki Daniel - Kiedy załadowałem mały plik z HDFS, liczba partycji była domyślnie dwie (RDD.getNumPartitions()), jak wspomniano. Tak więc rozpoczęto dwa zadania dla każdej partycji, a więc dwa pliki w HDFS. Zrobiłem koalesce (1), a liczba plików na wyjściu wyniosła 1. –

0

Odpowiadając na kilka punktów, które nie zostały uwzględnione w poprzednich odpowiedzi:

  • w trybie autonomicznym, musisz grać z --executor-cores i --max-executor-cores, aby ustawić liczbę uruchamianych executorów (przy założeniu, że masz wystarczającą ilość pamięci aby dopasować tę liczbę, jeśli podasz --executor-memory)

  • Spark nie przydzielać zadania w sposób okrężny, używa mechanizmu zwanego „Delay Scheduling”, która jest techniką pull opartych pozwalając każdej wykonawcę do zaoferowania to dostępność do kapitan, który zdecyduje, czy wysłać zadanie.

Powiązane problemy