2015-06-27 8 views
16

Napisałem klasę implementującą klasyfikatora w python. Chciałbym użyć Apache Spark, aby zrównoważyć klasyfikację ogromnej liczby punktów danych przy użyciu tego klasyfikatora.Jak używać niestandardowych klas z Apache Spark (pyspark)?

  1. Jestem skonfigurowany przy użyciu Amazon EC2 na klastrze z 10 niewolnikami, opartym na ami, który pochodzi z dystrybucji Pythona na Anacondzie. Ami pozwala mi zdalnie korzystać z notebooka IPython.
  2. Zdefiniowałem klasę BoTree w wywołaniu pliku BoTree.py na wzorcu w folderze /root/anaconda/lib/python2.7/, gdzie są wszystkie moje moduły python:
  3. Sprawdziłem to Mogę zaimportować i używać BoTree.py podczas uruchamiania iskry linii poleceń od mastera (muszę tylko zacząć od napisania importu BoTree i moja klasa BoTree stanie się dostępna
  4. Użyłem iskry/root/iskrow-ec2/copy-dir Skrypt .sh do skopiowania katalogu /python2.7/ w moim klastrze
  5. Ssh-ed do jednego z niewolników i próbowałem uruchomić tam ipython i był w stanie zaimportować BoTree, więc myślę, że moduł został wysłane pomyślnie przez klastry (mogę również zobaczyć plik BoTree.py w .../python2.7/folder)
  6. Na wzorca, który sprawdziłem, mogę pobierać i rozpakowywać instancję BoTree za pomocą cPickle, co rozumiem jako serializator pyspark.

Jednak, kiedy należy wykonać następujące czynności:

import BoTree 
bo_tree = BoTree.train(data) 
rdd = sc.parallelize(keyed_training_points) #create rdd of 10 (integer, (float, float) tuples 
rdd = rdd.mapValues(lambda point, bt = bo_tree: bt.classify(point[0], point[1])) 
out = rdd.collect() 

Spark nie powiedzie się z powodu błędu (tylko odpowiedni bit myślę):

File "/root/spark/python/pyspark/worker.py", line 90, in main 
    command = pickleSer.loads(command.value) 
    File "/root/spark/python/pyspark/serializers.py", line 405, in loads 
    return cPickle.loads(obj) 
ImportError: No module named BoroughTree 

Czy ktoś może mi pomóc? Nieco zdesperowany ...

Dzięki

Odpowiedz

13

Prawdopodobnie najprostszym rozwiązaniem jest użycie pyFiles argumentu podczas tworzenia SparkContext

from pyspark import SparkContext 
sc = SparkContext(master, app_name, pyFiles=['/path/to/BoTree.py']) 

Każdy plik umieszczony tam wysyłają pracowników i dodawane do PYTHONPATH.

Jeśli pracujesz w trybie interaktywnym, musisz zatrzymać istniejący kontekst, używając sc.stop(), zanim utworzysz nowy.

Upewnij się również, że pracownik Spark używa dystrybucji Anaconda, a nie domyślny interpreter języka Python. Na podstawie Twojego opisu jest to najprawdopodobniej problem. Aby ustawić PYSPARK_PYTHON, możesz użyć plików conf/spark-env.sh.

Na bocznym pliku kopiowania notatek do lib jest raczej niechlujne rozwiązanie. Jeśli chcesz uniknąć przesyłania plików za pomocą pyFiles, zalecamy utworzenie pakietu Python lub pakietu Conda i prawidłową instalację. W ten sposób możesz łatwo śledzić, co jest zainstalowane, usuwać niepotrzebne pakiety i unikać trudnych do debugowania problemów.

+0

Dzięki za to. Używam python interakcyjnie, więc nie mogę skonfigurować SparkContext. Jak zrobić w tym przypadku odpowiednik pliku pyFiles? Zdefiniowałem funkcję, która importuje sys, a następnie zwraca sys.executable. Myślę, że to mówi mi, że wszyscy moi niewolnicy prowadzą Anakondę. Jednak jeśli ssh do nich, widzę, że zmienna środowiskowa PYSPARK_PYTHON nie jest ustawiona. Jak mogę edytować PYTHONPATH na moich niewolnikach? – user3279453

+0

W rzeczywistości można utworzyć SparkContext w trybie interaktywnym. Zobacz zaktualizowaną odpowiedź na kilka szczegółów na ten temat i zmienną "PYSPARK_PYTHON" – zero323