2012-09-07 13 views
15

Jeśli mam dwa obiekty threading.Event() i chcę spać, dopóki nie zostanie ustawione jedno z nich, czy istnieje skuteczny sposób na zrobienie tego w pythonie? Najwyraźniej mógłbym zrobić coś z odpytywaniem/przekroczeniem czasu, ale naprawdę chciałbym, aby wątek pozostał w stanie uśpienia, dopóki nie zostanie ustawiony jeden, podobny do tego, jak select jest używany do deskryptorów plików.Python threading: czy mogę spać jednocześnie na dwóch threading.Event()?

Jak będzie wyglądać następująca implementacja, jeśli chodzi o efektywną metodę niepolowania pod numerem wait_for_either?

a = threading.Event() 
b = threading.Event() 

wait_for_either(a, b) 
+0

Czy istnieje dobry powód używania dwóch różnych zdarzeń i nie używania tego samego? –

+0

@Iulius masz pojedynczy wątek, który ma być sterowany zdarzeniami, ale ma 2 kolejki ... więc musisz się obudzić, gdy q otrzyma element – pyInTheSky

+0

Jestem zaskoczony, że Python nie ma tego wbudowanego. –

Odpowiedz

18

Oto nie odpytywania unikanie nadmiernego roztwór gwint: modyfikowanie istniejących Event S na ogień zwrotnego, gdy zmieniają się i chwytać ustawiania nowego zdarzenia w tym zwrotnego:

import threading 

def or_set(self): 
    self._set() 
    self.changed() 

def or_clear(self): 
    self._clear() 
    self.changed() 

def orify(e, changed_callback): 
    e._set = e.set 
    e._clear = e.clear 
    e.changed = changed_callback 
    e.set = lambda: or_set(e) 
    e.clear = lambda: or_clear(e) 

def OrEvent(*events): 
    or_event = threading.Event() 
    def changed(): 
     bools = [e.is_set() for e in events] 
     if any(bools): 
      or_event.set() 
     else: 
      or_event.clear() 
    for e in events: 
     orify(e, changed) 
    changed() 
    return or_event 

wykorzystanie próbki:

def wait_on(name, e): 
    print "Waiting on %s..." % (name,) 
    e.wait() 
    print "%s fired!" % (name,) 

def test(): 
    import time 

    e1 = threading.Event() 
    e2 = threading.Event() 

    or_e = OrEvent(e1, e2) 

    threading.Thread(target=wait_on, args=('e1', e1)).start() 
    time.sleep(0.05) 
    threading.Thread(target=wait_on, args=('e2', e2)).start() 
    time.sleep(0.05) 
    threading.Thread(target=wait_on, args=('or_e', or_e)).start() 
    time.sleep(0.05) 

    print "Firing e1 in 2 seconds..." 
    time.sleep(2) 
    e1.set() 
    time.sleep(0.05) 

    print "Firing e2 in 2 seconds..." 
    time.sleep(2) 
    e2.set() 
    time.sleep(0.05) 

wynikiem której:

Waiting on e1... 
Waiting on e2... 
Waiting on or_e... 
Firing e1 in 2 seconds... 
e1 fired!or_e fired! 

Firing e2 in 2 seconds... 
e2 fired! 

Th powinien być bezpieczny dla wątków. Wszelkie komentarze są mile widziane.

EDYCJA: Och, a tu jest twoja funkcja wait_for_either, chociaż sposób, w jaki napisałem kod, najlepiej jest wykonać i przekazać około or_event. Zauważ, że or_event nie należy ustawiać ani czyścić ręcznie.

def wait_for_either(e1, e2): 
    OrEvent(e1, e2).wait() 
+2

To jest miłe! Jednak widzę jeden problem: jeśli "orify" to samo zdarzenie dwa razy, otrzymasz nieskończoną pętlę za każdym razem, gdy ją ustawisz lub wyczyścisz. – Vincent

+0

To dobry punkt! Zmodyfikuje wkrótce – Claudiu

+0

Dziękuję bardzo za to! Właśnie tego szukałem. Czy zgadzasz się, aby kod w tej odpowiedzi był używany w warunkach licencji open source? BSD lub MIT byłyby idealne, ponieważ są kompatybilne z Numpy, Pandami, Scipy itp. – naitsirhc

4

Jednym z rozwiązań (z odpytywania) byłoby zrobić kolejnych czeka na każdym Event w pętli

def wait_for_either(a, b): 
    while True: 
     if a.wait(tunable_timeout): 
      break 
     if b.wait(tunable_timeout): 
      break 

myślę, że jeśli dostroić timeout na tyle dobrze, wyniki byłyby OK.


Najlepszym bez odpytywania mogę myśleć, to czekać na każdego z nich w innym wątku i ustawić wspólny Event kogo będzie czekać po w głównym wątku.

def repeat_trigger(waiter, trigger): 
    waiter.wait() 
    trigger.set() 

def wait_for_either(a, b): 
    trigger = threading.Event() 
    ta = threading.Thread(target=repeat_trigger, args=(a, trigger)) 
    tb = threading.Thread(target=repeat_trigger, args=(b, trigger)) 
    ta.start() 
    tb.start() 
    # Now do the union waiting 
    trigger.wait() 

Dość interesujący, więc napisałem wersji OOP poprzedniego rozwiązania:

class EventUnion(object): 
    """Register Event objects and wait for release when any of them is set""" 
    def __init__(self, ev_list=None): 
     self._trigger = Event() 
     if ev_list: 
      # Make a list of threads, one for each Event 
      self._t_list = [ 
       Thread(target=self._triggerer, args=(ev,)) 
       for ev in ev_list 
      ] 
     else: 
      self._t_list = [] 

    def register(self, ev): 
     """Register a new Event""" 
     self._t_list.append(Thread(target=self._triggerer, args=(ev,))) 

    def wait(self, timeout=None): 
     """Start waiting until any one of the registred Event is set""" 
     # Start all the threads 
     map(lambda t: t.start(), self._t_list) 
     # Now do the union waiting 
     return self._trigger.wait(timeout) 

    def _triggerer(self, ev): 
     ev.wait() 
     self._trigger.set() 
+1

można zrobić repeat_trigger również sprawdzić dla wyzwalacza (z timeout = 0 dla wyzwalacza i timeout> 0 dla kelnera) tak, że wszystkie wątki ostatecznie kończą się –

+0

i myślałem tak samo, ale musi być lepszy sposób niż rozpoczęcie 2 wątków ... – Claudiu

0

Nie dość, ale można korzystać z dwóch dodatkowych tematów do multipleksowania wydarzenia ...

def wait_for_either(a, b): 
    flag = False #some condition variable, event, or similar 

    class Event_Waiter(threading.Thread): 
    def __init__(self, event): 
     self.e = event 
    def run(self): 
     self.e.wait() 
     flag.set() 

    a_thread = Event_Waiter(a) 
    b_thread = Event_Waiter(b) 
    a.start() 
    b.start() 
    flag.wait() 

Uwaga, być może trzeba będzie się martwić o przypadkowe dostanie obu zdarzeń, jeśli przybywają zbyt szybko. Pomocnicze wątki (a_thread i b_thread) powinny blokować synchronizację, próbując ustawić flagę, a następnie powinny zabić drugi wątek (ewentualnie resetując zdarzenie tego wątku, jeśli zostało zużyte).

1

Uruchamianie dodatkowych wątków wydaje się być jasnym rozwiązaniem, niezbyt skutecznym. Funkcja wait_events zablokuje użycie dowolnego z wydarzeń.

def wait_events(*events): 
    event_share = Event() 

    def set_event_share(event): 
     event.wait() 
     event.clear() 
     event_share.set() 
    for event in events: 
     Thread(target=set_event_share(event)).start() 

    event_share.wait() 

wait_events(event1, event2, event3) 
+0

Byłoby miło wiedzieć, który został wywołany – Har

0
def wait_for_event_timeout(*events): 
    while not all([e.isSet() for e in events]): 
     #Check to see if the event is set. Timeout 1 sec. 
     ev_wait_bool=[e.wait(1) for e in events] 
     # Process if all events are set. Change all to any to process if any event set 
     if all(ev_wait_bool): 
      logging.debug('processing event') 
     else: 
      logging.debug('doing other work') 


e1 = threading.Event() 
e2 = threading.Event() 

t3 = threading.Thread(name='non-block-multi', 
         target=wait_for_event_timeout, 
         args=(e1,e2)) 
t3.start() 

logging.debug('Waiting before calling Event.set()') 
time.sleep(5) 
e1.set() 
time.sleep(10) 
e2.set() 
logging.debug('Event is set') 
1

Rozszerzanie Claudiu's odpowiedź, gdzie można poczekać na zdarzenie 1 lub zdarzenie 2. 1 lub zdarzenia, a nawet 2.

from threading import Thread, Event, _Event 

class ConditionalEvent(_Event): 
    def __init__(self, events_list, condition): 
     _Event.__init__(self) 

     self.event_list = events_list 
     self.condition = condition 

     for e in events_list: 
      self._setup(e, self._state_changed) 

     self._state_changed() 

    def _state_changed(self): 
     bools = [e.is_set() for e in self.event_list] 
     if self.condition == 'or': 

      if any(bools): 
       self.set() 
      else: 
       self.clear() 

     elif self.condition == 'and': 

      if all(bools): 
       self.set() 
      else: 
       self.clear() 

    def _custom_set(self,e): 
     e._set() 
     e._state_changed() 

    def _custom_clear(self,e): 
     e._clear() 
     e._state_changed() 

    def _setup(self, e, changed_callback): 
     e._set = e.set 
     e._clear = e.clear 
     e._state_changed = changed_callback 
     e.set = lambda: self._custom_set(e) 
     e.clear = lambda: self._custom_clear(e) 

Przykład użycia będzie bardzo podobny jak przed

import time 

e1 = Event() 
e2 = Event() 

or_e = ConditionalEvent([e1, e2], 'or') 


Thread(target=wait_on, args=('e1', e1)).start() 
time.sleep(0.05) 
Thread(target=wait_on, args=('e2', e2)).start() 
time.sleep(0.05) 
Thread(target=wait_on, args=('or_e', or_e)).start() 
time.sleep(0.05) 

print "Firing e1 in 2 seconds..." 
time.sleep(2) 
e1.set() 
time.sleep(0.05) 

print "Firing e2 in 2 seconds..." 
time.sleep(2) 
e2.set() 
time.sleep(0.05) 
Powiązane problemy