2013-03-09 28 views
29

Próbuję uruchomić trochę kodu Pythona na kilka plików równolegle. Konstrukt jest zasadniczo:Pula wieloprocesorowa Pythona zawiesza się przy łączeniu?

def process_file(filename, foo, bar, baz=biz): 
    # do stuff that may fail and cause exception 

if __name__ == '__main__': 
    # setup code setting parameters foo, bar, and biz 

    psize = multiprocessing.cpu_count()*2 
    pool = multiprocessing.Pool(processes=psize) 

    map(lambda x: pool.apply_async(process_file, (x, foo, bar), dict(baz=biz)), sys.argv[1:]) 
    pool.close() 
    pool.join() 

ja wcześniej używany pool.map zrobić coś podobnego i to działało świetnie, ale nie wydaje się, że tu użyć, ponieważ pool.map nie (wydaje się) pozwól mi przekazać dodatkowe argumenty (i użycie lambda to nie zadziała, ponieważ lambda nie może zostać zmobilizowana).

Tak więc teraz próbuję uruchomić rzeczy za pomocą funkcji apply_async() bezpośrednio. Mój problem polega na tym, że kod wydaje się zawieszać i nigdy się nie kończy. Kilka plików kończy się niepowodzeniem z wyjątkiem, ale nie rozumiem, dlaczego to, co spowodowałoby niepowodzenie/zawieszenie połączenia? Co ciekawe, jeśli żaden z plików nie zakończy się niepowodzeniem z wyjątkiem, to program kończy działanie w sposób czysty.

Czego mi brakuje?

Edit: Gdy funkcja (a zatem pracownik) nie widzę ten wyjątek:

Exception in thread Thread-3: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner 
    self.run() 
    File "/usr/lib/python2.7/threading.py", line 505, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 376, in _handle_results 
    task = get() 
TypeError: ('__init__() takes at least 3 arguments (1 given)', <class 'subprocess.CalledProcessError'>,()) 

Jeśli widzę nawet jeden z nich, proces nadrzędny proces zawiesza się zawsze, nigdy czerpania dzieci i wysiadanie .

+0

Twój kod wydaje się działać dobrze, nawet jeśli wrzucam losowe wyjątki do 'process_file'. Być może ma to związek z tym, co robisz w 'process_file', który powoduje problemy. – robertklep

+0

Huh. jaka wersja pythona? Mam 2,7. Plik process_file w prawdziwym programie jest dość skomplikowany, wykorzystując w dużym stopniu biblioteki PIL, NetworkX, poly2tri i inne. Znam co najmniej 2 miejsca, w których poznałem błędy, które mogą powodować wyjątki w niektórych przypadkach, ale muszę po prostu zignorować te błędy i przejść dalej. Zastanawiam się, dlaczego to nigdy nie wyjdzie dla mnie, ale pracuję dla ciebie. – clemej

+0

2.7.2, to jest to, co testowałem z: https://gist.github.com/robertklep/5125319 – robertklep

Odpowiedz

40

Przepraszam za odpowiedź na moje pytanie, ale znalazłem przynajmniej obejście, więc na wypadek, gdyby ktoś miał podobny problem, chcę go opublikować tutaj. Przyjmuję lepsze odpowiedzi tam.

Wierzę, że źródłem problemu jest http://bugs.python.org/issue9400. To mówi mi dwie rzeczy:

  • ja nie zwariowałem, co staram się robić naprawdę ma pracować
  • przynajmniej python2, to jest bardzo trudne, jeśli nie niemożliwe, aby w marynacie „wyjątki” powrót do procesu nadrzędnego. Proste działają, ale wiele innych nie.

W moim przypadku moja funkcja pracownika uruchomiła podproces, który był segfault. Ten zwrócony wyjątek CalledProcessError, który nie jest dostępny do pobrania. Z jakiegoś powodu powoduje to, że obiekt puli w obiekcie macierzystym wychodzi na lunch i nie zwraca się z połączenia do połączenia().

W moim szczególnym przypadku, nie obchodzi mnie, czym był wyjątek. Co najwyżej chcę go zarejestrować i kontynuować. Aby to zrobić, po prostu zawijam funkcję top worker w klauzuli try/except. Jeśli pracownik zgłosi wyjątek, zostanie on przechwycony przed próbą powrotu do procesu nadrzędnego, zarejestrowany, a następnie proces roboczy zostanie zakończony normalnie, ponieważ nie będzie już próbował wysłać wyjątku. Zobacz poniżej:

def process_file_wrapped(filenamen, foo, bar, baz=biz): 
    try: 
     process_file(filename, foo, bar, baz=biz) 
    except: 
     print('%s: %s' % (filename, traceback.format_exc())) 

Następnie mam moją początkową funkcję mapową wywołanie process_file_wrapped() zamiast oryginalnej. Teraz mój kod działa zgodnie z przeznaczeniem.

+7

Nie musisz przepraszać za odpowiedź na własne pytanie. Ta strona teraz dokumentuje prawdziwy problem z obejściem. Dobre. –

+1

Nawiasem mówiąc, innym rozwiązaniem może być pobranie komunikatu o błędzie wyjątku i podniesienie go przy użyciu podstawowej klasy "Exception", która, jak sądzę, jest łatwa do zniesienia. –

+1

Wciąż jestem nowicjuszem w StackExchange i nie mam pewności co do etykiety. Ponieważ fragment komentarza @robertklep z powyższego komentarza działa z prostym wyjątkiem(), podejrzewam, że też byłoby w porządku ... ale najważniejsze jest to, że musisz wychwycić wszystkie wyjątki i zwrócić to, co znane. – clemej

4

Można rzeczywiście użyć instancji functools.partial zamiast lambda w przypadkach, gdy obiekt wymaga piklowania. partial obiekty są dostępne od Pythona 2.7 (oraz w Pythonie 3).

pool.map(functools.partial(process_file, x, foo, bar, baz=biz), sys.argv[1:]) 
+0

Hmm. Nie używałem functools wcześniej. Dzięki za informację. Nadal podejrzewam, że nadal będzie cierpiał na ten sam problem z wyjątkami i propozycjami. – clemej

+0

Prawdopodobnie; Nie mogę powiedzieć. Wspomniałeś, że poprzednio odniosłeś sukces z 'pool.map', więc może to pomoże. – nneonneo

+0

Użyłem pool.map w zupełnie innym kontekście, w którym rzeczy nie mogły powodować wyjątków. Powinienem był to wyjaśnić w tym pytaniu. – clemej

2

Co jest warte, miałem podobny błąd (nie taki sam), gdy zawieszono pool.map. Mój przypadek użycia pozwolił mi rozwiązać problem za pomocą pool.terminate (upewnij się, że wszystko działa również przed zmianą).

użyłem pool.map przed wywołaniem terminate więc wiem wszystko wykończone, z docs:

równoległy odpowiednik mapie() funkcja wbudowana (obsługuje tylko jeden argument, choć iterable). Blokuje się, aż wynik będzie gotowy.

Jeśli to jest twój przypadek użycia, może to być sposób na poprawienie tego.

Powiązane problemy