2011-09-06 13 views
5

Mam aplikację GUI, która musi pobrać i przeanalizować różne zasoby z sieci obok głównej pętli interfejsu GUI. Szukałem opcji przy użyciu modułu przetwarzania wieloprocesorowego Pythona, ponieważ te akcje pobierania zawierają nie tylko blokowanie IO, ale także zawierają ciężkie przetwarzanie, więc wieloprocesorowość może być tu lepszą opcją niż wątki Pythona. Byłoby łatwo używać Twisted, ale tym razem Twisted nie jest opcją.Asynchroniczne wywołanie w aplikacji GUI z wykorzystaniem wieloprocesowości

znalazłem proste rozwiązanie tutaj:

Python subprocess: callback when cmd exits

Problemem jest to, że zwrotna magicznie nie nazywa się wewnątrz w MainThread.

więc wymyślić następujące rozwiązanie:

delegate.py

import os 
import multiprocessing as mp 
import signal 
from collections import namedtuple 
import uuid 
import logging 


_CALLBACKS = {} 
_QUEUE = mp.Queue() 

info = logging.getLogger(__name__).info 


class Call(namedtuple('Call', 'id finished result error')): 

    def attach(self, func): 
     if not self.finished: 
      _CALLBACKS.setdefault(self.id, []).append(func) 
     else: 
      func(self.result or self.error) 

     return self 

    def callback(self): 
     assert self.finished, 'Call not finished yet' 
     r = self.result or self.error 
     for func in _CALLBACKS.pop(self.id, []): 
      func(r) 

    def done(self, result=None, error=None): 
     assert not self.finished, 'Call already finished' 
     return self._replace(finished=(-1 if error else 1), 
      result=result, error=error) 

    @classmethod 
    def create(clss): 
     call = clss(uuid.uuid4().hex, 0, None, None) # uuid ??? 
     return call 

def run(q, cb, func, args=None, kwargs=None): 
    info('run: try running %s' % func) 
    try: 
     cb = cb.done(result=func(*(args or()), **(kwargs or {}))) 
    except Exception, err: 
     cb = cb.done(error=err) 
    q.put(cb) 
    os.kill(os.getppid(), signal.SIGUSR2) # SIGUSR2 ??? 
    info('run: leaving') 

def on_callback(sig, frame): 
    info('on_callback: checking queue ...') 
    c = _QUEUE.get(True, 2) 
    info('on_callback: got call - %s' % repr(c)) 
    c.callback() 

signal.signal(signal.SIGUSR2, on_callback) # SIGUSR2 ??? 

def delegate(func, *args, **kwargs): 
    info('delegate: %s %s' % (func, args,)) 
    cb = Call.create() 
    mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start() 
    return cb 


__all__ = ['delegate'] 

Wykorzystanie

from delegate import delegate 

def sleeper(secs): 
    assert secs >= 1, 'I need my Augenpflege' 
    info('sleeper: will go to sleep for %s secs' % secs) 
    sleep(secs) 
    info('sleeper: woke up - returning result') 
    return ['sleeper', 'result'] 

def on_sleeper_result(r): 
    if isinstance(r, Exception): 
     info('on_sleeper_result: got error: %s' % r) 
    else: 
     info('on_sleeper_result: got result: %s' % r) 

from delegate import delegate 
delegate(sleeper, 3).attach(on_sleeper_result) 
delegate(sleeper, -3).attach(on_sleeper_result) 
while 1: 
    info('main: loop') 
    sleep(1) 

wyjście

0122 08432 MainThread INFO delegate: <function sleeper at 0x163e320> (3,) 
MainThread INFO delegate: <function sleeper at 0x163e320> (-3,) 
0124 08437 MainThread INFO run: try running <function sleeper at 0x163e320> 
0124 08437 MainThread INFO sleeper: will go to sleep for 3 secs 
0124 08432 MainThread INFO main: loop 
0125 08438 MainThread INFO run: try running <function sleeper at 0x163e320> 
0126 08438 MainThread INFO run: leaving 
0126 08432 MainThread INFO on_callback: checking queue ... 
0126 08432 MainThread INFO on_callback: got call - Call(id='057649cba7d840e3825aa5ac73248f78', finished=-1, result=None, error=AssertionError('I need my Augenpflege',)) 
0127 08432 MainThread INFO on_sleeper_result: got error: I need my Augenpflege 
0127 08432 MainThread INFO main: loop 
1128 08432 MainThread INFO main: loop 
2129 08432 MainThread INFO main: loop 
3127 08437 MainThread INFO sleeper: woke up - returning result 
3128 08437 MainThread INFO run: leaving 
3128 08432 MainThread INFO on_callback: checking queue ... 
3129 08432 MainThread INFO on_callback: got call - Call(id='041420c6c83a489aa5c7409c662d4917', finished=1, result=['sleeper', 'result'], error=None) 
3129 08432 MainThread INFO on_sleeper_result: got result: ['sleeper', 'result'] 
3129 08432 MainThread INFO main: loop 
4130 08432 MainThread INFO main: loop 
5132 08432 MainThread INFO main: loop 
... 

tej pory ten działa całkiem dobrze, ale moje doświadczenie z modułem wieloprocesowym są umiarkowane i nie jestem pewien, czy to będzie działać bez efektów. Moje pytanie to: - na co szczególnie zwracam uwagę podczas korzystania z wieloprocesowego przetwarzania w taki sposób ... lub czy istnieją "bardziej poprawne" wzorce dla asynchronicznego mechanizmu zwrotnego przy użyciu standardowej biblioteki python?

+0

Eli Bendersky napisał coś na ten temat: [LINK] (http://eli.thegreenplace.net/2011/05/26/code-sample-socket-client-based-on-twisted-with-pyqt /) – JBernardo

+0

Czytałem ten post na blogu wcześniej, ale jak już wspomniałem, Twisted nie jest opcją dla tego projektu - i tak dziękuję – hooblei

Odpowiedz

2

Nie ma powodu, aby używać sygnałów (low level api) do wieloprocesorowego przetwarzania w Pythonie i zajętego oczekiwania w głównej pętli.

Trzeba uruchomić (zmodyfikowany) pętlę zdarzeń w QThread, które mogą bezpośrednio wywoływać kod qt, lub użyć QApplication.postEvent (lub pyqtSignal), aby wykonać go w głównym wątku

# this should be in the delegate module 
while 1: 
    c = _QUEUE.get(True) # no timeout 
    c.callback() # or post event to main thread 

Można zobacz także this page do dyskusji na temat komunikacji między wątkami w qt

+0

Aaah - to jest to - wypchnąłem cały kod do QThread (lub lepiej QRunnable + QThreadPool) i to działa jak urok - dziękuję – hooblei

1

Twój kod działa, ale to nie jest tak proste, jak mogłoby być. Przejdźmy przez kod.

Tworzy instancję Call w głównym procesie:

def delegate(func, *args, **kwargs): 
    cb = Call.create() 

ale kiedy przechodzą cb do procesu roboczego,

mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start() 

instancja Call jest kopiowany podczas os.fork ING, tworząc w ten sposób druga, osobna instancja.Następnie wzywa cb.done i który wzywa cb._replace która zwraca trzecią instancję Call:

def done(self, result=None, error=None): 
    assert not self.finished, 'Call already finished' 
    return self._replace(finished=(-1 if error else 1), 
     result=result, error=error) 

Powyższy wywołuje prywatną metodę namedtuple _replace. mogłoby to być proste wypowiedzi Python jak

self.finished = -1 if error else 1 

jeśli Call były podklasą object zamiast podklasą namedtuple. Subclassing namedtuple zapisany kawałek wpisując w __init__ ale staje się dość niezdarny później ponieważ musimy zmodyfikować atrybuty namedtuple „s ...

Tymczasem oryginalny Call wystąpienie zwrócony przez delegate(...) w głównym procesie wzywa attach:

delegate(...).attach(on_sleeper_result) 

Modyfikuje globalny dyktaturę _CALLBACKS. Procesy robocze nie mają możliwości poznania tej zmiany w _CALLBACKS; w procesach roboczych _CALLBACKS jest wciąż pustym dict. Tak więc musisz przekazać instancję pracującą z powrotem do głównego procesu przez mp.Queue, która używa cb.id do odwoływania się do właściwych funkcji w _CALLBACKS.

Więc to wszystko działa, ale to tworzy trzy Call instancje dla każdego wywołania delegate, a kod może wprowadzać w błąd niewtajemniczonych do myślenia trzy Call przypadki są takie same obiekt .... To wszystko działa, ale to trochę skomplikowane.

Czy rozważałeś zamiast tego używać parametru mp.Pool.apply_async 'callback?

import multiprocessing as mp 
import logging 
import time 
import collections 

_CALLBACKS=collections.defaultdict(list) 

logger=mp.log_to_stderr(logging.DEBUG) 

def attach(name,func): 
    _CALLBACKS[name].append(func) 

def delegate(func, *args, **kwargs): 
    id=kwargs.pop('id') 
    try: 
     result=func(*args,**kwargs) 
    except Exception, err: 
     result=err 
    return (id,result) 

def sleeper(secs): 
    assert secs >= 1, 'I need my Augenpflege' 
    logger.info('sleeper: will go to sleep for %s secs' % secs) 
    time.sleep(secs) 
    logger.info('sleeper: woke up - returning result') 
    return ['sleeper', 'result'] 

def callback(r): 
    id,result=r 
    for func in _CALLBACKS[id]: 
     func(result) 

def on_sleeper_result(r): 
    if isinstance(r, Exception): 
     logger.error('on_sleeper_result: got error: %s' % r) 
    else: 
     logger.info('on_sleeper_result: got result: %s' % r) 

if __name__=='__main__': 
    pool=mp.Pool() 
    pool.apply_async(delegate,args=(sleeper, -3),kwds={'id':1}, 
        callback=callback) 
    attach(1,on_sleeper_result) 
    pool.apply_async(delegate,args=(sleeper, 3),kwds={'id':2}, 
        callback=callback) 
    attach(2,on_sleeper_result)  
    while 1: 
     logger.info('main: loop') 
     time.sleep(1) 
+0

Witam, byłem całkowicie świadomy wielu instancji wywołań podczas przechodzenia między dwóch procesów i wywołania _replace, ale wziąłem to dla czytelności, ponieważ te delegowane akcje byłyby stosunkowo rzadkie w aplikacji. Tak, to apply_async było moją pierwszą próbą, ale jak już wspomniałem - program obsługi wywołania zwrotnego był nie wywoływany w bieżącym wątku głównym. Możesz zobaczyć to samo zachowanie także tutaj: http://stackoverflow.com/questions/2581817/python-subprocess-callback-when-cmd-exits/5209746#5209746 Ale dziękuję za szczegółowe wyjaśnienie. – hooblei

Powiązane problemy