2015-11-15 7 views
15

Uruchomiłem tę samą symulację w pętli o różnych parametrach. Każda symulacja wykorzystuje pandas DataFrame (data), która jest tylko odczytywana, nigdy nie jest modyfikowana. Korzystanie ipyparallel (ipython równoległy), mogę umieścić ten DataFrames do zmiennej globalnej przestrzeni każdego silnika moim zdaniem przed symulacji start:Jak najlepiej udostępniać dane statyczne między klientem ipyparallel a zdalnymi silnikami?

view['data'] = data 

Silniki potem mieć dostęp do DataFrame dla wszystkich symulacji, które się poruszają się na im. Proces kopiowania danych (jeśli jest marynowany, data to 40 MB) to tylko kilka sekund. Wydaje się jednak, że jeśli liczba symulacji rośnie, zużycie pamięci rośnie bardzo duże. Wyobrażam sobie, że te wspólne dane są kopiowane dla każdego zadania, a nie tylko dla każdego silnika. Jaka jest najlepsza praktyka udostępniania statycznych danych tylko do odczytu przez klienta za pomocą silników? Kopiowanie go raz na silnik jest dopuszczalne, ale najlepiej byłoby skopiować go tylko raz na hosta (mam 4 silniki na host1 i 8 na hoście2).

Oto mój kod:

from ipyparallel import Client 
import pandas as pd 

rc = Client() 
view = rc[:] # use all engines 
view.scatter('id', rc.ids, flatten=True) # So we can track which engine performed what task 

def do_simulation(tweaks): 
    """ Run simulation with specified tweaks """ 
    # Do sim stuff using the global data DataFrame 
    return results, id, tweaks 

if __name__ == '__main__': 
    data = pd.read_sql("SELECT * FROM my_table", engine) 
    threads = [] # store list of tweaks dicts 
    for i in range(4): 
     for j in range(5): 
      for k in range(6): 
       threads.append(dict(i=i, j=j, k=k) 

    # Set up globals for each engine. This is the read-only DataFrame 
    view['data'] = data 
    ar = view.map_async(do_simulation, threads) 

    # Our async results should pop up over time. Let's measure our progress: 
    for idx, (results, id, tweaks) in enumerate(ar): 
     print 'Progress: {}%: Simulation {} finished on engine {}'.format(100.0 * ar.progress/len(ar), idx, id) 
     # Store results as a pickle for the future 
     pfile = '{}_{}_{}.pickle'.format(tweaks['i'], tweaks['j'], tweaks['j']) 
     # Save our results to a pickle file 
     pd.to_pickle(results, out_file_path + pfile) 

    print 'Total execution time: {} (serial time: {})'.format(ar.wall_time, ar.serial_time) 

Jeśli liczba symulacji są małe (~ 50), to zajmuje trochę czasu, aby zacząć, ale zaczynam widzieć instrukcje print postęp. Co dziwne, wiele zadań zostanie przydzielonych do tego samego silnika i nie widzę odpowiedzi, dopóki wszystkie te zadania nie zostaną wykonane dla tego silnika. Spodziewam się uzyskać odpowiedź od enumerate(ar) za każdym razem, gdy zakończy się jedno zadanie symulacji.

Jeśli liczenie symulacji jest duże (~ 1000), rozpoczęcie pracy jest bardzo czasochłonne, widzę procesor przepustnicy w górę we wszystkich silnikach, ale żadne instrukcje drukowania postępów nie są widoczne aż do dłuższego czasu (~ 40 min), oraz kiedy widzę postęp, wydaje się, że duży blok (> 100) zadań trafił do tego samego silnika i oczekiwano ukończenia z tego jednego silnika, zanim nastąpił pewien postęp. Kiedy ten silnik się ukończył, zobaczyłem, że obiekt ar dostarczył nowe odpowiedzi w ciągu 4 sekund - mogło to być opóźnienie w zapisywaniu wyjściowych plików pikla.

Wreszcie, host1 uruchamia także zadanie ipycontroller, a jego użycie pamięci wzrasta jak szalone (zadanie w języku Python pokazuje przy użyciu> 6 GB pamięci RAM, zadanie jądra pokazuje przy użyciu 3 GB). Silnik host2 tak naprawdę nie wykazuje dużego wykorzystania pamięci. Co spowodowałoby ten skok w pamięci?

+0

czy kiedykolwiek znalazłeś rozwiązanie? To jest problem, przed którym teraz stoimy. – DrSAR

Odpowiedz

6

Użyłem tej logiki w kodzie kilka lat temu, a ja użyłem this. Mój kod to coś jak:

shared_dict = { 
    # big dict with ~10k keys, each with a list of dicts 
} 

balancer = engines.load_balanced_view() 

with engines[:].sync_imports(): # your 'view' variable 
    import pandas as pd 
    import ujson as json 

engines[:].push(shared_dict) 

results = balancer.map(lambda i: (i, my_func(i)), id) 
results_data = results.get() 

Jeśli liczba symulacji są małe (~ 50), to zajmuje trochę czasu, aby dostać zaczął, ale wiem od czego zacząć, aby zobaczyć instrukcje print postęp. O dziwo, wiele zadań zostanie przypisanych do tego samego silnika i nie widzę odpowiedzi , dopóki wszystkie te zadania nie zostaną wykonane dla tego silnika . Spodziewam się zobaczyć odpowiedź z wyliczeniem (ar) za każdym razem, gdy jedno zadanie symulacji zostanie zakończone.

W moim przypadku, my_func() był złożoną metodą, w której umieszczałem wiele komunikatów logowania zapisanych w pliku, więc miałem swoje instrukcje drukowania.

O zadaniu zadania, tak jak ja użyłem load_balanced_view(), wyszedłem do biblioteki i znalazłem drogę.

Jeśli liczba symulacji są duże (~ 1000), to zajmuje dużo czasu, aby dostać zaczęło widzę Procesory udusić się we wszystkich silnikach, ale żaden postęp oświadczenia drukujące są widoczne dopiero długi czas (~ 40 minut), a kiedy wykonam zobacz postęp, wydaje się, że duży blok (> 100) zadań trafił do tego samego silnika i oczekiwano ukończenia z tego jednego silnika przed przekazaniem pewnego postępu w postaci . Kiedy ten jeden silnik się ukończył, zobaczyłem, że obiekt ar dostarczył nowe odpowiedzi w ciągu 4 sekund - mogło to być opóźnienie czasowe w celu zapisania wyjściowych plików pikla.

Od dawna tego nie doświadczyłem, więc nie mogę nic powiedzieć.

Mam nadzieję, że to może rzucić trochę światła na twój problem.


PS: tak jak powiedziałem w komentarzu, mógłbyś spróbować multiprocessing.Pool. Wydaje mi się, że nie próbowałem udostępniać dużych, tylko do odczytu danych, jako zmiennych globalnych, które go używają. Chciałbym spróbować, ponieważ it seems to work.

+1

To nie skaluje się dobrze ze średnimi danymi, obawiam się. Na przykład, jeśli Twoje dane to 5 GB i używasz 8 silników na komputerze lokalnym z 8 rdzeniami, możesz skopiować 40 GB danych. –

+0

Hmm, miło wiedzieć, że teraz. Czy próbowałeś Pool z biblioteki wieloprocesowej? Wydaje mi się, że nie próbowałem udostępniać danych tylko do odczytu jako zmiennej globalnej za pomocą procesu wieloprocesowego. Chciałbym spróbować, ponieważ wydaje się działać (patrz: https://kaushikghose.wordpress.com/tag/python/). – paulochf

+1

Dzięki za twój wgląd. Wygląda na to, że [iPyParallel ewoluował] (https://ipyparallel.readthedocs.io/en/latest/details.html), ponieważ zadałem to pytanie i teraz używa PyZMQ do zarządzania wieloma komputerami klienckimi z ich pracownikami pracującymi nad wspólnym zestawem zadania. Przypomina to Redis Queuing, z którym łatwiej się pracuje i jest kierunkiem, w którym się znalazłem, ponieważ jest o wiele łatwiejsze niż iPyParallel – hamx0r

Powiązane problemy