2016-06-05 10 views
5

Grałem z Spark i Python na tym internetowym jupyter notebooka https://tmpnb.org/ i 3 sposoby próbowali przekazać funkcje Pythona:Przechodząc funkcje Pythona jako obiekty do Spark

1), używając mapy

import numpy as np 
def my_sqrt(x): 
    return np.sqrt(x) 

sc.parallelize(range(10)).map(my_sqrt).collect() 

2) parallelizing my_sqrt i nazwać

sc.parallelize([(my_sqrt, i) for i in range(10)]).map(lambda x : x[0](x[1])).collect() 

3) na równoległy np.sqrt i nazywają to

sc.parallelize([(np.sqrt, i) for i in range(10)]).map(lambda x : x[0](x[1])).collect() 

(1) i (3) wykonywać pracę i (2) nie robi. Najpierw chciałbym zrozumieć, dlaczego/jak (1) i (3) działają. Po drugie, chciałbym zrozumieć, dlaczego (2) nie i co można zrobić, aby działało.

Odpowiedz

6

Pierwszy podejście działa, ponieważ Spark jest za pomocą specjalnych strategii serializacji przetwarzać zamknięcia wymaganych dla przekształceń, które jest znacznie wolniejsze, ale bardziej wydajne niż średnia pickle (inaczej nie mogliśmy skorzystać .map(lambda x: ...)).

Ostatnie podejście działa, ponieważ nie ma potrzeby serializowania kodu funkcji w ogóle. Odwołuje się do modułu sqrt od numpy tak długo, jak NumPy jest dostępny dla każdego pracownika, nie ma problemu.

Drugie podejście nie działa, ponieważ marynowanie nie powoduje serializacji kodu.

import pickle 

pickle.dumps(my_sqrt) 
## b'\x80\x03c__main__\nmy_sqrt\nq\x00.' 

Wszystko robi stwierdza proszę mi dać obiekt przypisany do my_sqrt (my_sqrt.__name__) ze skryptu środowiska najwyższego poziomu (a.k.a. __main__). Gdy jest wykonywany na robotach, nie korzysta z tego samego środowiska i nie ma już takiego obiektu w zakresie, stąd wyjątek. Dla jasności nie jest to ani błąd, ani coś konkretnego dla Sparka. można łatwo odtworzyć samo zachowanie lokalnie następująco:

In [1]: import pickle 

In [2]: def foo(): ... 

In [3]: foo_ = pickle.dumps(foo) 

In [4]: pickle.loads(foo_) 
Out[4]: <function __main__.foo> 

In [5]: del foo 

In [6]: pickle.loads(foo_) 
--------------------------------------------------------------------------- 
AttributeError       Traceback (most recent call last) 
... 

AttributeError: Can't get attribute 'foo' on <module '__main__'> 

Ponieważ nie zajmuje się rzeczywistej wartości można nawet przypisać tak:

In [7]: foo = "foo" 

In [8]: pickle.loads(foo_) 
Out[8]: 'foo' 

Zabierz wiadomość tutaj jest jeśli chcesz użyć funkcji w ten sposób, umieść ją w osobnym module i rozprowadź wśród pracowników tak samo, jak w przypadku innych zależności, w tym definicji klas niestandardowych.

+1

Czy możesz umieścić link do jakiegoś materiału, który rozszerza się na "specjalną strategię serializacji", której używa iskra zamiast marynowania? – x89a10

+1

@ x89a10 Spark używa [zmodyfikowanej wersji] (https://github.com/apache/spark/blob/33ae7a35daa86c34f1f9f72f997e0c2d4cd8abec/python/pyspark/cloudpickle.py) z ['cloudpickle'] (https://github.com/cloudpipe/cloudpickle). Istnieje kilka innych elementów rozproszonych, ale to powinno dać ci wystarczająco dużo, aby zrozumieć różnicę. – zero323

Powiązane problemy