17

Mam wielowątkową aplikację Pythona. Chcę uruchomić pętlę asyncio w wątku, a następnie opublikowac kalki i współprogramy z innego wątku. Powinno być łatwe, ale nie mogę się skupić na rzeczach asyncio.python asyncio, jak tworzyć i anulować zadania z innego wątku

wpadłem do następnego rozwiązania, które robi połowę tego, co chcę, czuć się swobodnie wypowiedzieć się na temat wszystkiego:

import asyncio 
from threading import Thread 

class B(Thread): 
    def __init__(self): 
     Thread.__init__(self) 
     self.loop = None 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) #why do I need that?? 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def add_task(self, coro): 
     """this method should return a task object, that I 
      can cancel, not a handle""" 
     f = functools.partial(self.loop.create_task, coro) 
     return self.loop.call_soon_threadsafe(f) 

    def cancel_task(self, xx): 
     #no idea 

@asyncio.coroutine 
def test(): 
    while True: 
     print("running") 
     yield from asyncio.sleep(1) 

b.start() 
time.sleep(1) #need to wait for loop to start 
t = b.add_task(test()) 
time.sleep(10) 
#here the program runs fine but how can I cancel the task? 

b.stop() 

Więc uruchamiania i zatrzymać pętlę działa dobrze. Myślałem o tworzeniu zadania przy użyciu create_task, ale ta metoda nie jest bezpieczna dla wątków, więc zapakowałem ją w call_soon_threadsafe. Ale chciałbym móc uzyskać obiekt zadania, aby móc anulować zadanie. Mogę zrobić skomplikowane rzeczy używając Future and Condition, ale musi być prostszy sposób, prawda?

Odpowiedz

13

Myślę, że być może trzeba będzie podać swojej metodzie add_task, czy jest ona wywoływana z wątku innego niż pętla zdarzeń. W ten sposób, jeśli zostanie wywołany z tego samego wątku, możesz po prostu zadzwonić bezpośrednio pod numer asyncio.async, w przeciwnym razie może wykonać dodatkową pracę, aby przekazać zadanie z wątku pętli do wątku wywołującego. Oto przykład:

import time 
import asyncio 
import functools 
from threading import Thread, current_thread, Event 
from concurrent.futures import Future 

class B(Thread): 
    def __init__(self, start_event): 
     Thread.__init__(self) 
     self.loop = None 
     self.tid = None 
     self.event = start_event 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) 
     self.tid = current_thread() 
     self.loop.call_soon(self.event.set) 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def add_task(self, coro): 
     """this method should return a task object, that I 
      can cancel, not a handle""" 
     def _async_add(func, fut): 
      try: 
       ret = func() 
       fut.set_result(ret) 
      except Exception as e: 
       fut.set_exception(e) 

     f = functools.partial(asyncio.async, coro, loop=self.loop) 
     if current_thread() == self.tid: 
      return f() # We can call directly if we're not going between threads. 
     else: 
      # We're in a non-event loop thread so we use a Future 
      # to get the task from the event loop thread once 
      # it's ready. 
      fut = Future() 
      self.loop.call_soon_threadsafe(_async_add, f, fut) 
      return fut.result() 

    def cancel_task(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 


@asyncio.coroutine 
def test(): 
    while True: 
     print("running") 
     yield from asyncio.sleep(1) 

event = Event() 
b = B(event) 
b.start() 
event.wait() # Let the loop's thread signal us, rather than sleeping 
t = b.add_task(test()) # This is a real task 
time.sleep(10) 
b.stop() 

Najpierw zapisać id wątku pętli zdarzeń w metodzie run, dzięki czemu możemy dowiedzieć się, czy połączenia z add_task pochodzą z innych wątków później. Jeśli wywoływane jest add_task z wątku pętli bez zdarzeń, używamy call_soon_threadsafe do wywołania funkcji, która będzie zarówno zaplanować coroutine, a następnie użyć concurrent.futures.Future, aby przekazać zadanie z powrotem do wątku wywołującego, który czeka na wynik Future.

Uwaga na anulowanie zadania: Ty kiedy zadzwonić cancel na Task, A CancelledError zostanie podniesiona w współprogram następnym razem biegnie pętla zdarzeń. Oznacza to, że współprowadzący, że zadanie jest owijane, zostanie przerwany z powodu wyjątku, gdy następnym razem osiągnie granicę plastyczności - chyba że coroutine złapie CancelledError i uniemożliwi sobie przerwanie. Zauważ także, że działa to tylko wtedy, gdy funkcja, która jest zawijana, jest w rzeczywistości przerywaną postacią; na przykład asyncio.Future, zwrócony przez BaseEventLoop.run_in_executor, tak naprawdę nie może zostać anulowany, ponieważ jest faktycznie owinięty wokół concurrent.futures.Future, a tych nie można anulować, gdy ich podstawowa funkcja faktycznie zacznie działać. W takich przypadkach asyncio.Future powie, że zostało anulowane, ale funkcja faktycznie uruchomiona w executorze będzie nadal działać.

Edit: Zaktualizowany pierwszy przykład użyć concurrent.futures.Future, zamiast queue.Queue, za sugestią Andrew Světlov użytkownika.

Uwaga: Uwaga: asyncio.async jest przestarzałe od wersji 3.4.4 zamiast tego używa asyncio.ensure_future.

+0

Dzięki za przykład pomógł mi rozwiązać kilka problemów, które miałem. Btw także musiałem zainicjować Future with Future (loop = self.loop), inaczej w niektórych przypadkach przyszłość przyjąłaby niewłaściwą pętlę –

+0

@OlivierRD Powinieneś używać 'concurrent.futures.Future', a nie' asyncio.Future'. 'concurrent.futures.Future' nie przyjmuje arytmetycznego słowa' loop'. – dano

+0

Dokumentacja wydaje się mówić: https://docs.python.org/3/library/asyncio-task.html#asyncio.Future –

6

Wszystko robisz dobrze. do zatrzymywania zadania make metody

class B(Thread): 
    # ... 
    def cancel(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 

BTW ty mieć do konfiguracji pętli zdarzenia dla utworzonego wątku w sposób jawny przez

self.loop = asyncio.new_event_loop() 
asyncio.set_event_loop(self.loop) 

ponieważ asyncio tworzy niejawny pętlę zdarzeń tylko dla głównego wątku.

+0

brakujące ogniwo tutaj jest to, jak uzyskać dojście do 'task' w pierwszej kolejności. Ponieważ OP musi używać 'call_soon_threadsafe (self.loop.create_task)' w metodzie 'add_task', w rzeczywistości nie ma uchwytu do zadania po dodaniu go do pętli. – dano

+1

Rozumiem. Masz rację. @dano BTW możesz użyć concurrent.futures.Future zamiast Queue w swojej odpowiedzi. Myślę, że to czystsze. –

+0

Tak, zgadzam się, że użycie 'Future' jest ładniejsze niż' Queue'. Zaktualizowałem swoją odpowiedź, aby to odzwierciedlić. Dzięki! – dano

5

tylko w celach informacyjnych tutaj kod, który w końcu zaimplementowałem w oparciu o pomoc, którą otrzymałem na tej stronie, jest prostszy, ponieważ nie potrzebowałem wszystkich funkcji. dzięki jeszcze raz!

class B(Thread): 
    def __init__(self): 
     Thread.__init__(self) 
     self.loop = None 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def _add_task(self, future, coro): 
     task = self.loop.create_task(coro) 
     future.set_result(task) 

    def add_task(self, coro): 
     future = Future() 
     p = functools.partial(self._add_task, future, coro) 
     self.loop.call_soon_threadsafe(p) 
     return future.result() #block until result is available 

    def cancel(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 
2

Od wersji 3.4.4 asyncio zapewnia funkcję o nazwie run_coroutine_threadsafe złożyć współprogram obiekt z wątku do pętli zdarzeń. Zwraca concurrent.futures.Future, aby uzyskać dostęp do wyniku lub anulować zadanie.

Korzystanie przykład:

@asyncio.coroutine 
def test(loop): 
    try: 
     while True: 
      print("Running") 
      yield from asyncio.sleep(1, loop=loop) 
    except asyncio.CancelledError: 
     print("Cancelled") 
     loop.stop() 
     raise 

loop = asyncio.new_event_loop() 
thread = threading.Thread(target=loop.run_forever) 
future = asyncio.run_coroutine_threadsafe(test(loop), loop) 

thread.start() 
time.sleep(5) 
future.cancel() 
thread.join() 
+0

Aby uniknąć sytuacji wyścigu lub zakleszczenia, nie wywołuj bezpośrednio 'future.cancel()'. Zamiast tego użyj 'loop.call_soon_threadsafe (future.cancel)'. Zobacz [tutaj] (https://docs.python.org/3.4/library/asyncio-dev.html#concurrency-and-mithithreading). – changyuheng

+1

@ ChangYu-heng Dotyczy to [asyncio.Future] (https://docs.python.org/3.4/library/asyncio-task.html#asyncio.Future) kontraktów futures, ale [run_coroutine_threadsafe] (https: // docs.python.org/3.4/library/asyncio-task.html#asyncio.run_coroutine_threadsafe) zwraca [concurrent.futures.Future] (https://docs.python.org/3.4/library/concurrent.futures.html# concurrent.futures.Future), który jest wątkowo bezpieczny i nie zależy od żadnej pętli zdarzeń. – Vincent

+0

@Vicent Przepraszamy, nie przeczytałem dokładnie oryginalnego pytania. Więc dodatkowy komentarz do tego byłby: użyj 'loop.call_soon_threadsafe (future.cancel)' jeśli zamierzasz wykonać 'future.cancel()' z wątku, który nie jest pętlą zdarzeń. – changyuheng

Powiązane problemy