2015-04-30 23 views
5

Jestem świadomy multiprocessing.Manager() i jak można go używać do tworzenia wspólnych obiektów, w szczególności kolejek, które mogą być wymieniane między pracownikami. Istnieje this question, this question, this question, a nawet one of my own questions.Udostępnianie wielu kolejek między procesami w Pythonie

Jednak muszę zdefiniować wiele kolejek, z których każda łączy określoną parę procesów. Powiedzmy, że każda para procesów i kolejka łącząca są identyfikowane przez zmienną key.

Chcę użyć słownika, aby uzyskać dostęp do moich kolejek, gdy potrzebuję umieścić i pobrać dane. Nie mogę tego zrobić. Próbowałem wielu rzeczy. Z multiprocessing importowane jako mp:

Definiowanie dict jak for key in all_keys: DICT[key] = mp.Queue w pliku konfiguracyjnym, który jest importowany przez moduł wieloprocesorowej (nazywają to multi.py) nie zwraca błędów, ale kolejka DICT[key] nie jest dzielone między procesami, każdy wydaje mieć własną kopię kolejki, a więc nie ma komunikacji.

Gdy próbuję zdefiniować DICT na początku funkcji main wieloprocesorowej, która definiuje procesy i zaczyna je, jak

DICT = mp.Manager().dict()  
for key in all_keys: 
    DICT[key] = mp.Queue() 

pojawia się błąd

RuntimeError: Queue objects should only be shared between processes through 
inheritance 

Zmiana na

DICT = mp.Manager().dict()  
for key in all_keys: 
    DICT[key] = mp.Manager().Queue() 

tylko pogarsza to wszystko. Próbowanie podobnych definicji na początku multi.py, a nie wewnątrz głównej funkcji, zwraca podobne błędy.

Musi istnieć sposób udostępniania wielu kolejek między procesami bez jawnego nazywania każdego z nich w kodzie. Jakieś pomysły?

Edit

Oto podstawowy schemat programu:

1- obciążenie pierwszy moduł, który definiuje pewne zmienne, import multi, uruchamia multi.main() i ładuje inny moduł, który uruchamia kaskadę obciążenia modułów i wykonania kodu. Tymczasem ...

2- multi.main wygląda następująco:

def main(): 
    manager = mp.Manager() 
    pool = mp.Pool() 
    DICT2 = manager.dict() 

    for key in all_keys: 
     DICT2[key] = manager.Queue() 
     proc_1 = pool.apply_async(targ1,(DICT1[key],)) #DICT1 is defined in the config file 
     proc_2 = pool.apply_async(targ2,(DICT2[key], otherargs,) 

Zamiast używać procesy pool i manager, ja również rozpoczęcie wraz z następujących powodów:

mp.Process(target=targ1, args=(DICT[key],)) 

3 - funkcja targ1 pobiera dane wejściowe, które nadchodzą (sortowane przez key) z głównego procesu. Ma on przekazać wynik do DICT[key], dzięki czemu targ2 może wykonać swoją pracę. Ta część nie działa. Istnieje dowolna liczba targ1 s, targ2 s itp., A zatem dowolna liczba kolejek.

4 - Wyniki niektórych z tych procesów zostaną przesłane do grupy różnych ramek danych pand, które są również indeksowane przez key i które chciałbym uzyskać z dowolnych procesów, nawet tych uruchomionych w innym moduł. Muszę jeszcze napisać tę część i może to być inne pytanie. (Wspominam o tym tutaj, ponieważ odpowiedź na powyższe 3 może również rozwiązać 4 ładnie.)

+0

W jaki sposób uruchamiasz swoje procesy podrzędne? Czy możesz utworzyć instancję 'Queue' przed uruchomieniem procesów? Czy pary procesów, o których mówisz, dotyczą potencjalnie dwóch procesów potomnych, czy też zawsze jest relacją rodzic-dziecko? – dano

+0

Hej człowieku, dodałem edycję, która opisuje mój program w krótkim zarysie. Tworzę instancję 'Queue' przed uruchomieniem procesów. Nie jestem pewien, jak odróżnić dziecko od procesu nadrzędnego, więc nie mogę odpowiedzieć na ostatnie pytanie ... – Wapiti

+0

Dlaczego 'DICT2' jest' manage.dict() 'w powyższym kodzie?Wygląda na to, że nie próbujesz przekazać obiektu 'DICT2' do żadnych dzieci. Czy nie może to być zwykły dyktat zawierający 'mp.Manager(). Queue()' instances? – dano

Odpowiedz

10

Wygląda na to, że twoje problemy zaczęły się, gdy próbujesz udostępnić numer multiprocessing.Queue(), przekazując go jako argument. Można obejść ten problem poprzez stworzenie managed queue zamiast:

import multiprocessing 
manager = mutiprocessing.Manager() 
passable_queue = manager.Queue() 

Podczas korzystania z menedżera, aby go utworzyć, jesteś przechowywanie i przekazywanie wokół pełnomocnika do kolejki, zamiast samej kolejce, więc nawet gdy obiekt przekazywany do procesów roboczych jest kopiowany, nadal wskazuje na tę samą podstawową strukturę danych: kolejkę. Jest bardzo podobny (w koncepcji) do wskaźników w C/C++. Jeśli utworzysz w ten sposób kolejki, będziesz mógł je przekazać po uruchomieniu procesu roboczego.

Ponieważ możesz teraz ominąć kolejki, nie musisz już zarządzać słownikiem. Zachowaj standardowy słownik, który będzie przechowywał wszystkie mapowania i nadaj swoim procesom roboczym potrzebne kolejek, aby nie potrzebowały dostępu do żadnych mapowań.

Napisałem o tym tutaj. Wygląda na to, że mijacie przedmioty pomiędzy swoimi pracownikami, więc to właśnie tutaj robimy. Wyobraźmy sobie, że mamy dwa etapy przetwarzania, a dane zaczynają się i kończą pod kontrolą main. Spójrz, jak możemy tworzyć kolejek, które łączą robotników jak rurociąg, ale nadając im tylko oni kolejek muszą, nie ma potrzeby, by o wszelkich odwzorowań:

import multiprocessing as mp 

def stage1(q_in, q_out): 

    q_out.put(q_in.get()+"Stage 1 did some work.\n") 
    return 

def stage2(q_in, q_out): 

    q_out.put(q_in.get()+"Stage 2 did some work.\n") 
    return 

def main(): 

    pool = mp.Pool() 
    manager = mp.Manager() 

    # create managed queues 
    q_main_to_s1 = manager.Queue() 
    q_s1_to_s2 = manager.Queue() 
    q_s2_to_main = manager.Queue() 

    # launch workers, passing them the queues they need 
    results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2)) 
    results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main)) 

    # Send a message into the pipeline 
    q_main_to_s1.put("Main started the job.\n") 

    # Wait for work to complete 
    print(q_s2_to_main.get()+"Main finished the job.") 

    pool.close() 
    pool.join() 

    return 

if __name__ == "__main__": 
    main() 

kod produkuje ten wyjście:

Główny rozpoczął pracę.
Etap 1 zrobił trochę pracy.
Etap 2 zrobił trochę pracy.
Główna zakończyła pracę.

nie zawierają przykładowe przechowywania kolejki lub AsyncResults obiektów w słownikach, bo nadal nie bardzo rozumiem, w jaki sposób program ma działać. Ale teraz, gdy możesz swobodnie przekazywać swoje kolejki, możesz zbudować słownik, aby zapisać mapowania kolejki/procesu w razie potrzeby.

W rzeczywistości, jeśli naprawdę tworzy się potok między wieloma pracownikami, nie trzeba nawet utrzymywać odniesienia do kolejek "między pracownikami" w main. Utwórz kolejki, przekazuj je swoim pracownikom, a następnie zachowaj odwołania do kolejek, które będą używane. Zdecydowanie zaleciłbym, aby stare kolejki były jak najszybciej zbiorem śmieci, jeśli naprawdę masz "dowolną liczbę" kolejek.

+0

Jeszcze raz dziękuję za szczegółową odpowiedź.To jest pomocne.Jestem trochę zdezorientowany. pierwotne pytanie dotyczyło tego, jak uniknąć pisania wielu definicji 'mp.Manager()', a zamiast tego umieścić wiele kolejek w dyktafonie. @dano pomógł mi rozwiązać to, co robiłem źle w komentarzach powyżej. Muszę czegoś pomijać w twoim przykładzie, ponieważ nie widzę sensu używania kolejki zarządzanej. Jeśli zamieniłem kolejki zarządzane na regularne i wyłączyłem menedżera i pulę i po prostu używam zwykłej składni wieloprocesowej, twój przykład też działa. Co dodaje żłób? – Wapiti

+0

@Wapiti Menedżer umożliwia przekazywanie Queues jako argumenty wejściowe do funkcji asynchronicznych. Rozwiązuje problem błędu "kolejki powinny być dziedziczone". Jeśli potrafisz wyjaśnić, co masz na myśli mówiąc, że "tylko pogorszyło sprawę", mogę pomóc ci to rozwiązać, ponieważ używanie menedżera jest tym, co powinieneś robić. Dict nie powinien być zarządzany, ponieważ nie jest współdzielony z żadnymi procesami poza głównym, a kiedy uruchamiasz procesy, przekazuj je zarówno w kolejce "w" i "na zewnątrz", i powinien działać. – skrrgwasme

+0

@Wapiti Ponadto, masz rację, że jeśli bezpośrednio używasz klasy 'multiprocessing.Process', możesz przekazać im normalne kolejki. Skoncentrowałem się na tym, aby kolejki pracowały dla ciebie z puli wieloprocesowej, ponieważ brzmiało to tak, jakbyś chciał tego użyć. Jeśli chcesz pozostać przy pulach, musisz nimi zarządzać, aby przekazać je jako argumenty. – skrrgwasme

Powiązane problemy