2015-02-19 57 views
5

Korzystanie asyncio w współprogram mogą być wykonywane z timeout tak robi anulowane po timeout:Python asyncio Timeout siły

@asyncio.coroutine 
def coro(): 
    yield from asyncio.sleep(10) 

loop = asyncio.get_event_loop() 
loop.run_until_complete(asyncio.wait_for(coro(), 5)) 

Powyższy przykład działa zgodnie z oczekiwaniami (IT razy się po 5 sekundach).

Jednak, gdy coroutine nie używa asyncio.sleep() (lub innych asyncio coroutines), nie wydaje się upływać czas. Przykład:

@asyncio.coroutine 
def coro(): 
    import time 
    time.sleep(10) 

loop = asyncio.get_event_loop() 
loop.run_until_complete(asyncio.wait_for(coro(), 1)) 

ta trwa dłużej niż 10 sekund, aby działać, ponieważ time.sleep(10) nie wyłącza się. Czy w takim przypadku możliwe jest egzekwowanie odwołania coroutine?

Jeśli do rozwiązania tego problemu należy zastosować asyncio, w jaki sposób mogę to zrobić?

Odpowiedz

12

Nie, nie można przerywać spotkania, chyba że przywróci ono pętlę zdarzeń, co oznacza, że ​​musi znajdować się wewnątrz połączenia yield from. asyncio jest jednowątkowe, więc jeśli blokujesz połączenie time.sleep(10) w twoim drugim przykładzie, pętla zdarzeń nie może być uruchomiona. Oznacza to, że po upływie ustawionego limitu czasu, który upłynął z użyciem wait_for, pętla zdarzeń nie będzie mogła podjąć na nim działania. Pętla zdarzeń nie ma możliwości ponownego uruchomienia, dopóki nie zostanie zakończona coro, w którym to momencie jest już za późno.

Z tego powodu należy unikać połączeń blokujących, które nie są asynchroniczne; za każdym razem, gdy połączenie blokuje się bez poddawania pętli zdarzeń, nic innego w twoim programie nie może zostać wykonane, co prawdopodobnie nie jest tym, czego potrzebujesz. Jeśli naprawdę trzeba zrobić długą, działanie blokujące, należy spróbować użyć BaseEventLoop.run_in_executor uruchomić go w wątku lub procesu basenie, który pozwoli uniknąć blokowania pętli zdarzenia:

import asyncio 
import time 
from concurrent.futures import ProcessPoolExecutor 

@asyncio.coroutine 
def coro(loop): 
    ex = ProcessPoolExecutor(2) 
    yield from loop.run_in_executor(ex, time.sleep, 10) # This can be interrupted. 

loop = asyncio.get_event_loop() 
loop.run_until_complete(asyncio.wait_for(coro(loop), 1)) 
+0

Kolejny przydatny przykład tutaj: https://github.com/calebmadrigal/asyncio-examples/blob/master/run_in_executor.py – shrx

1

THX @dano za odpowiedź. Jeśli uruchomiony coroutine nie jest trudne wymaganie, tutaj jest przerobiony, bardziej kompaktowa wersja

import asyncio, time, concurrent 

timeout = 0.5 
loop = asyncio.get_event_loop() 
future = asyncio.wait_for(loop.run_in_executor(None, time.sleep, 2), timeout) 
try: 
    loop.run_until_complete(future) 
    print('Thx for letting me sleep') 
except concurrent.futures.TimeoutError: 
    print('I need more sleep !') 

Dla ciekawskich, trochę debugowania w moim Python 3.5.2 wykazały, że przechodząc None jako wynikami executorów w tworzeniu _default_executor, w następujący sposób:

# _MAX_WORKERS = 5 
self._default_executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS) 
-1

Przykłady, które widziałem do obsługi limitu czasu są bardzo trywialne. Biorąc pod uwagę rzeczywistość, moja aplikacja jest nieco bardziej złożona. Sekwencja jest:

  1. Gdy klient łączy się z serwerem, serwer mają utworzyć kolejne połączenie do serwera wewnętrznego
  2. Gdy wewnętrzne połączenie z serwerem jest ok, czekać na klienta do wysyłania danych. Na podstawie tych danych możemy wysłać zapytanie do wewnętrznego serwera.
  3. Gdy są dane do wysłania na serwer wewnętrzny, wyślij go. Ponieważ wewnętrzny serwer czasami nie reaguje wystarczająco szybko, zawiń to żądanie do limitu czasu.
  4. Jeśli czasy pracy na zewnątrz, zwinąć wszystkie połączenia do sygnału klienta o błędzie

Aby osiągnąć wszystkie powyższe, utrzymując funkcjonowanie pętli zdarzeń, otrzymany kod zawiera poniższy kod:

def connection_made(self, transport): 
    self.client_lock_coro = self.client_lock.acquire() 
    asyncio.ensure_future(self.client_lock_coro).add_done_callback(self._got_client_lock) 

def _got_client_lock(self, task): 
    task.result() # True at this point, but call there will trigger any exceptions 
    coro = self.loop.create_connection(lambda: ClientProtocol(self), 
              self.connect_info[0], self.connect_info[1]) 
    asyncio.ensure_future(asyncio.wait_for(coro, 
              self.client_connect_timeout 
              )).add_done_callback(self.connected_server) 

def connected_server(self, task): 
    transport, client_object = task.result() 
    self.client_transport = transport 
    self.client_lock.release() 

def data_received(self, data_in): 
    asyncio.ensure_future(self.send_to_real_server(message, self.client_send_timeout)) 

def send_to_real_server(self, message, timeout=5.0): 
    yield from self.client_lock.acquire() 
    asyncio.ensure_future(asyncio.wait_for(self._send_to_real_server(message), 
                timeout, loop=self.loop) 
           ).add_done_callback(self.sent_to_real_server) 

@asyncio.coroutine 
def _send_to_real_server(self, message): 
    self.client_transport.write(message) 

def sent_to_real_server(self, task): 
    task.result() 
    self.client_lock.release() 
+0

Ta odpowiedź nie wydaje się odpowiadać na rzeczywiste pytanie, nie sądzę również, że jest to pomocne . (W związku z powyższym). Imo za dużo niepowiązanych rzeczy jest zrobione w kodzie, a faktyczna obsługa przekroczenia limitu czasu nie jest jasno przedstawiona. Mam nadzieję, że ta opinia pomoże. – siebz0r

+0

Dzięki za opinie. Rzeczywiste pytanie dotyczy coroutine może być wykonane z timeout, który mój kod robi. Jak stwierdziłem w mojej odpowiedzi, nie ma kodu, który można znaleźć w całym Internecie, gdzie coroutine jest wykonywany z timeoutem * bez * przy użyciu 'loop.run_until_complete()', dlatego to opublikowałem. Z uwagi na ograniczenia liczba metod/funkcji wydaje się obowiązkowa. Zachęcamy do zapewnienia bardziej zoptymalizowanego kodu. –