2016-01-27 21 views
10

Używam Spark Streaming z dwoma różnymi oknami (w oknie do szkolenia modelu z SKLearn i drugim do przewidywania wartości na podstawie tego modelu) i zastanawiam się, jak mogę uniknąć jednego okna ("powolnego" okna treningowego) do trenowania modelu, bez "blokowania" okna "szybkiego" przewidywania.
Moja uproszczony kod wygląda następująco:Jak uniknąć jednego okna Spark Streaming blokującego inne okno z uruchomionym rodzimym kodem Pythona

conf = SparkConf() 
conf.setMaster("local[4]") 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc, 1) 

stream = ssc.socketTextStream("localhost", 7000) 


import Custom_ModelContainer 

### Window 1 ### 
### predict data based on model computed in window 2 ### 

def predict(time, rdd): 
    try: 
     # ... rdd conversion to df, feature extraction etc... 

     # regular python code 
     X = np.array(df.map(lambda lp: lp.features.toArray()).collect()) 
     pred = Custom_ModelContainer.getmodel().predict(X) 

     # send prediction to GUI 

    except Exception, e: print e 

predictionStream = stream.window(60,60) 
predictionStream.foreachRDD(predict) 


### Window 2 ### 
### fit new model ### 

def trainModel(time, rdd): 
try: 
    # ... rdd conversion to df, feature extraction etc... 

    X = np.array(df.map(lambda lp: lp.features.toArray()).collect()) 
    y = np.array(df.map(lambda lp: lp.label).collect()) 

    # train test split etc... 

    model = SVR().fit(X_train, y_train) 
    Custom_ModelContainer.setModel(model) 

except Exception, e: print e 

modelTrainingStream = stream.window(600,600) 
modelTrainingStream.foreachRDD(trainModel) 

(Uwaga: Custom_ModelContainer jest klasa napisałem, aby zapisać i odzyskać wyszkolony modelu)

Moja konfiguracja zazwyczaj działa dobrze, z wyjątkiem że za każdym razem nowy model jest wyszkolony w drugim oknie (co zajmuje około minuty), pierwsze okna nie obliczają prognoz, dopóki szkolenie modelowe nie zostanie zakończone. Właściwie, wydaje mi się, że to ma sens, ponieważ dopasowanie modelu i prognozy są obliczane na węźle głównym (w ustawieniu niepodzielonym - z powodu SKLearn).

Moje pytanie brzmi: czy możliwe byłoby szkolenie modelu na pojedynczym węźle roboczym (zamiast w węźle głównym)? Jeśli tak, to w jaki sposób mogę osiągnąć ten drugi cel i czy to rzeczywiście rozwiązałoby mój problem?

Jeśli nie, to wszelkie inne sugestie dotyczące tego, jak mogę dokonać takiej konfiguracji bez opóźniania obliczeń w oknie 1?

Każda pomoc jest bardzo doceniana.

EDYCJA: Domyślam się, że bardziej ogólne pytanie brzmi: Jak mogę uruchomić dwa różne zadania na dwóch różnych pracowników równolegle?

Odpowiedz

2

Nota prawna: To tylko zestaw pomysłów. Żadne z nich nie zostało sprawdzone w praktyce.


Kilka rzeczy można spróbować:

  1. Nie collect do predict. scikit-learn modele są zazwyczaj serializable więc proces przewidywania mogą być łatwo obsługiwane w klastrze:

    def predict(time, rdd): 
        ... 
    
        model = Custom_ModelContainer.getmodel() 
        pred = (df.rdd.map(lambda lp: lp.features.toArray()) 
         .mapPartitions(lambda iter: model.predict(np.array(list(iter))))) 
        ... 
    

    Należy nie tylko parallelize prognoz, ale także, jeżeli surowe dane nie są przekazywane do GUI, zmniejszyć ilość danych, które muszą być gromadzone .

  2. Spróbuj wykonać collect i wysłać dane asynchronicznie. PySpark nie przewiduje collectAsync metody ale można starać się osiągnąć coś podobnego z concurrent.futures:

    from pyspark.rdd import RDD 
    from concurrent.futures import ThreadPoolExecutor 
    
    executor = ThreadPoolExecutor(max_workers=4) 
    
    def submit_to_gui(*args): ... 
    
    def submit_if_success(f): 
        if not f.exception(): 
         executor.submit(submit_to_gui, f.result()) 
    

    kontynuować od 1.

    def predict(time, rdd): 
        ... 
        f = executor.submit(RDD.collect, pred) 
        f.add_done_callback(submit_if_success) 
        ... 
    
  3. Jeśli naprawdę chcesz używać lokalnego scikit-learn modelu spróbować collect i fit korzystających futures jak wyżej. Można także spróbować zebrać tylko raz, zwłaszcza jeśli dane nie są buforowane: proces szkolenia

    def collect_and_train(df): 
        y, X = zip(*((p.label, p.features.toArray()) for p in df.collect())) 
        ... 
        return SVR().fit(X_train, y_train) 
    
    def set_if_success(f): 
        if not f.exception(): 
         Custom_ModelContainer.setModel(f.result()) 
    
    def trainModel(time, rdd): 
        ... 
        f = excutor.submit(collect_and_train, df) 
        f.add_done_callback(set_if_success) 
        ... 
    
  4. Przenieś do klastra albo za pomocą już istniejących rozwiązań jak spark-sklearn lub niestandardowego podejścia:

    • rozwiązanie naiwny - przygotuj swoje dane, coalesce(1) i trenuj jeden model za pomocą mapPartitions.
    • rozwiązanie rozproszone - twórz i sprawdzaj osobny model na partycję przy użyciu mapPartitions, zbieraj modele i używaj jako zespołu, na przykład wykonując średnią lub medianę przewidywania.
  5. wyrzucić scikit-learn i wykorzystywać model, który może być przeszkolony i utrzymywane w sposób rozproszony, streaming środowiska (np StreamingLinearRegressionWithSGD).

    Twoje obecne podejście sprawia, że ​​Spark jest przestarzały. Jeśli możesz ćwiczyć model lokalnie, istnieje duża szansa, że ​​możesz wykonać wszystkie inne zadania znacznie szybciej na lokalnej maszynie. W przeciwnym razie Twój program po prostu zawiedzie na collect.

1

Myślę, że to, czego szukasz, to własność: "spark.streaming.concurrentJobs", która domyślnie wynosi 1. Zwiększenie tej wartości powinno umożliwić równoległe uruchamianie wielu funkcji foreachRDD.

In JobScheduler.scala:

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) 

Przypominamy być również świadomi bezpieczeństwa wątku na niestandardowym modelu pojemnika, jeśli masz zamiar być mutowania i czytając równolegle. :)

Powiązane problemy