Jestem świadomy multiprocessing.Manager()
i jak można go używać do tworzenia wspólnych obiektów, w szczególności kolejek, które mogą być wymieniane między pracownikami. Istnieje this question, this question, this question, a nawet one of my own questions.Udostępnianie wielu kolejek między procesami w Pythonie
Jednak muszę zdefiniować wiele kolejek, z których każda łączy określoną parę procesów. Powiedzmy, że każda para procesów i kolejka łącząca są identyfikowane przez zmienną key
.
Chcę użyć słownika, aby uzyskać dostęp do moich kolejek, gdy potrzebuję umieścić i pobrać dane. Nie mogę tego zrobić. Próbowałem wielu rzeczy. Z multiprocessing
importowane jako mp
:
Definiowanie dict jak for key in all_keys: DICT[key] = mp.Queue
w pliku konfiguracyjnym, który jest importowany przez moduł wieloprocesorowej (nazywają to multi.py
) nie zwraca błędów, ale kolejka DICT[key]
nie jest dzielone między procesami, każdy wydaje mieć własną kopię kolejki, a więc nie ma komunikacji.
Gdy próbuję zdefiniować DICT
na początku funkcji main wieloprocesorowej, która definiuje procesy i zaczyna je, jak
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Queue()
pojawia się błąd
RuntimeError: Queue objects should only be shared between processes through
inheritance
Zmiana na
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Manager().Queue()
tylko pogarsza to wszystko. Próbowanie podobnych definicji na początku multi.py
, a nie wewnątrz głównej funkcji, zwraca podobne błędy.
Musi istnieć sposób udostępniania wielu kolejek między procesami bez jawnego nazywania każdego z nich w kodzie. Jakieś pomysły?
Edit
Oto podstawowy schemat programu:
1- obciążenie pierwszy moduł, który definiuje pewne zmienne, import multi
, uruchamia multi.main()
i ładuje inny moduł, który uruchamia kaskadę obciążenia modułów i wykonania kodu. Tymczasem ...
2- multi.main
wygląda następująco:
def main():
manager = mp.Manager()
pool = mp.Pool()
DICT2 = manager.dict()
for key in all_keys:
DICT2[key] = manager.Queue()
proc_1 = pool.apply_async(targ1,(DICT1[key],)) #DICT1 is defined in the config file
proc_2 = pool.apply_async(targ2,(DICT2[key], otherargs,)
Zamiast używać procesy pool
i manager
, ja również rozpoczęcie wraz z następujących powodów:
mp.Process(target=targ1, args=(DICT[key],))
3 - funkcja targ1
pobiera dane wejściowe, które nadchodzą (sortowane przez key
) z głównego procesu. Ma on przekazać wynik do DICT[key]
, dzięki czemu targ2
może wykonać swoją pracę. Ta część nie działa. Istnieje dowolna liczba targ1
s, targ2
s itp., A zatem dowolna liczba kolejek.
4 - Wyniki niektórych z tych procesów zostaną przesłane do grupy różnych ramek danych pand, które są również indeksowane przez key
i które chciałbym uzyskać z dowolnych procesów, nawet tych uruchomionych w innym moduł. Muszę jeszcze napisać tę część i może to być inne pytanie. (Wspominam o tym tutaj, ponieważ odpowiedź na powyższe 3 może również rozwiązać 4 ładnie.)
W jaki sposób uruchamiasz swoje procesy podrzędne? Czy możesz utworzyć instancję 'Queue' przed uruchomieniem procesów? Czy pary procesów, o których mówisz, dotyczą potencjalnie dwóch procesów potomnych, czy też zawsze jest relacją rodzic-dziecko? – dano
Hej człowieku, dodałem edycję, która opisuje mój program w krótkim zarysie. Tworzę instancję 'Queue' przed uruchomieniem procesów. Nie jestem pewien, jak odróżnić dziecko od procesu nadrzędnego, więc nie mogę odpowiedzieć na ostatnie pytanie ... – Wapiti
Dlaczego 'DICT2' jest' manage.dict() 'w powyższym kodzie?Wygląda na to, że nie próbujesz przekazać obiektu 'DICT2' do żadnych dzieci. Czy nie może to być zwykły dyktat zawierający 'mp.Manager(). Queue()' instances? – dano