2015-03-02 15 views
10

Napisałem ten program w iskrą shellDlaczego foreach nie przynosi niczego do programu sterownika?

val array = sc.parallelize(List(1, 2, 3, 4)) 
array.foreach(x => println(x)) 

ten drukuje niektóre wypowiedzi debugowania, ale nie rzeczywiste liczby. poniżej

Kod działa poprawnie

for(num <- array.take(4)) { 
    println(num) 
} 

rozumiem, że take to działanie, a więc spowoduje iskra wyzwolić leniwe obliczenia.

Ale foreach powinno pracowały w ten sam sposób ... dlaczego nie foreach wnoszą nic z powrotem od iskry i zacząć robić rzeczywiste przetwarzanie (wyjść z trybu leniwe)

Jak mogę dokonać foreach na RDD praca?

Odpowiedz

23

Metoda RDD.foreach w Spark działa w klastrze, więc każdy pracownik zawierający te rekordy wykonuje operacje w postaci foreach. To znaczy. Twój kod działa, ale drukuje on na stacjach roboczych Spark, a nie w sterowniku/sesji powłoki. Jeśli spojrzysz na wynik (stdout) dla swoich pracowników Sparka, zobaczysz je wydrukowane na konsoli.

Możesz wyświetlić standardowe wyświetlanie na robotach, przechodząc do web gui uruchomionego dla każdego uruchomionego executora. Przykładem URL http://workerIp:workerPort/logPage/?appId=app-20150303023103-0043&executorId=1&logType=stdout

Spark Executor Stdout

W tym przykładzie Spark zdecyduje się umieścić wszystkie rekordy z RDD w tej samej partycji.

Ma to sens, jeśli się nad tym zastanowić - spójrz na sygnaturę funkcji dla foreach - nic nie zwraca.

/** 
* Applies a function f to all elements of this RDD. 
*/ 
def foreach(f: T => Unit): Unit 

To jest naprawdę celem foreach w Scala - używany do jej efekt uboczny.

Gdy zbieracie rekordy, sprowadzacie je z powrotem do sterownika, więc logicznie zbieramy/bierzemy operacje są właśnie uruchomione w kolekcji Scala w sterowniku Spark - widać dane wyjściowe dziennika, jak sterownik iskrzenia/iskra jest tym, co drukuje na stdout w twojej sesji.

Przypadek użycia foreach może nie wydawać się od razu widoczny, na przykład - jeśli dla każdego rekordu w RDD chcesz wykonać pewne zewnętrzne zachowanie, na przykład wywołanie interfejsu API REST, możesz to zrobić w foreach, a następnie każdy Spark pracownik wysłałby połączenie do serwera API z wartością. Jeśli foreach przywróci zapisy, możesz łatwo zdmuchnąć pamięć w procesie sterownika/powłoki. W ten sposób unikasz tych problemów i możesz wykonywać działania niepożądane we wszystkich elementach RDD w klastrze.

Jeśli chcesz zobaczyć, co jest w RDD używam;

array.collect.foreach(println) 
//Instead of collect, use take(...) or takeSample(...) if the RDD is large 
+1

Foreach jest wielki, kiedy musisz zaktualizować akumulator wewnątrz funkcji i chcesz, aby akcja gwarantowała aktualizację tylko raz. O ile mogę powiedzieć, jest to jedyna akcja w Spark, która pozwala mi na czysto uruchomić funkcję na RDD. ([Per the Spark Docs] (http://spark.apache.org/docs/latest/programming-guide.html#accumulators-a-nameacclinka) W przypadku aktualizacji akumulatorów wykonanych wyłącznie w ramach akcji, Spark gwarantuje aktualizację każdego zadania do akumulator zostanie zastosowany tylko raz, tzn. zrestartowane zadania nie zaktualizują wartości) – JimLohse

Powiązane problemy