Mam aplikację GUI, która musi pobrać i przeanalizować różne zasoby z sieci obok głównej pętli interfejsu GUI. Szukałem opcji przy użyciu modułu przetwarzania wieloprocesorowego Pythona, ponieważ te akcje pobierania zawierają nie tylko blokowanie IO, ale także zawierają ciężkie przetwarzanie, więc wieloprocesorowość może być tu lepszą opcją niż wątki Pythona. Byłoby łatwo używać Twisted, ale tym razem Twisted nie jest opcją.Asynchroniczne wywołanie w aplikacji GUI z wykorzystaniem wieloprocesowości
znalazłem proste rozwiązanie tutaj:
Python subprocess: callback when cmd exits
Problemem jest to, że zwrotna magicznie nie nazywa się wewnątrz w MainThread.
więc wymyślić następujące rozwiązanie:
delegate.py
import os
import multiprocessing as mp
import signal
from collections import namedtuple
import uuid
import logging
_CALLBACKS = {}
_QUEUE = mp.Queue()
info = logging.getLogger(__name__).info
class Call(namedtuple('Call', 'id finished result error')):
def attach(self, func):
if not self.finished:
_CALLBACKS.setdefault(self.id, []).append(func)
else:
func(self.result or self.error)
return self
def callback(self):
assert self.finished, 'Call not finished yet'
r = self.result or self.error
for func in _CALLBACKS.pop(self.id, []):
func(r)
def done(self, result=None, error=None):
assert not self.finished, 'Call already finished'
return self._replace(finished=(-1 if error else 1),
result=result, error=error)
@classmethod
def create(clss):
call = clss(uuid.uuid4().hex, 0, None, None) # uuid ???
return call
def run(q, cb, func, args=None, kwargs=None):
info('run: try running %s' % func)
try:
cb = cb.done(result=func(*(args or()), **(kwargs or {})))
except Exception, err:
cb = cb.done(error=err)
q.put(cb)
os.kill(os.getppid(), signal.SIGUSR2) # SIGUSR2 ???
info('run: leaving')
def on_callback(sig, frame):
info('on_callback: checking queue ...')
c = _QUEUE.get(True, 2)
info('on_callback: got call - %s' % repr(c))
c.callback()
signal.signal(signal.SIGUSR2, on_callback) # SIGUSR2 ???
def delegate(func, *args, **kwargs):
info('delegate: %s %s' % (func, args,))
cb = Call.create()
mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start()
return cb
__all__ = ['delegate']
Wykorzystanie
from delegate import delegate
def sleeper(secs):
assert secs >= 1, 'I need my Augenpflege'
info('sleeper: will go to sleep for %s secs' % secs)
sleep(secs)
info('sleeper: woke up - returning result')
return ['sleeper', 'result']
def on_sleeper_result(r):
if isinstance(r, Exception):
info('on_sleeper_result: got error: %s' % r)
else:
info('on_sleeper_result: got result: %s' % r)
from delegate import delegate
delegate(sleeper, 3).attach(on_sleeper_result)
delegate(sleeper, -3).attach(on_sleeper_result)
while 1:
info('main: loop')
sleep(1)
wyjście
0122 08432 MainThread INFO delegate: <function sleeper at 0x163e320> (3,)
MainThread INFO delegate: <function sleeper at 0x163e320> (-3,)
0124 08437 MainThread INFO run: try running <function sleeper at 0x163e320>
0124 08437 MainThread INFO sleeper: will go to sleep for 3 secs
0124 08432 MainThread INFO main: loop
0125 08438 MainThread INFO run: try running <function sleeper at 0x163e320>
0126 08438 MainThread INFO run: leaving
0126 08432 MainThread INFO on_callback: checking queue ...
0126 08432 MainThread INFO on_callback: got call - Call(id='057649cba7d840e3825aa5ac73248f78', finished=-1, result=None, error=AssertionError('I need my Augenpflege',))
0127 08432 MainThread INFO on_sleeper_result: got error: I need my Augenpflege
0127 08432 MainThread INFO main: loop
1128 08432 MainThread INFO main: loop
2129 08432 MainThread INFO main: loop
3127 08437 MainThread INFO sleeper: woke up - returning result
3128 08437 MainThread INFO run: leaving
3128 08432 MainThread INFO on_callback: checking queue ...
3129 08432 MainThread INFO on_callback: got call - Call(id='041420c6c83a489aa5c7409c662d4917', finished=1, result=['sleeper', 'result'], error=None)
3129 08432 MainThread INFO on_sleeper_result: got result: ['sleeper', 'result']
3129 08432 MainThread INFO main: loop
4130 08432 MainThread INFO main: loop
5132 08432 MainThread INFO main: loop
...
tej pory ten działa całkiem dobrze, ale moje doświadczenie z modułem wieloprocesowym są umiarkowane i nie jestem pewien, czy to będzie działać bez efektów. Moje pytanie to: - na co szczególnie zwracam uwagę podczas korzystania z wieloprocesowego przetwarzania w taki sposób ... lub czy istnieją "bardziej poprawne" wzorce dla asynchronicznego mechanizmu zwrotnego przy użyciu standardowej biblioteki python?
Eli Bendersky napisał coś na ten temat: [LINK] (http://eli.thegreenplace.net/2011/05/26/code-sample-socket-client-based-on-twisted-with-pyqt /) – JBernardo
Czytałem ten post na blogu wcześniej, ale jak już wspomniałem, Twisted nie jest opcją dla tego projektu - i tak dziękuję – hooblei