2013-07-10 8 views
12

Wymagane jest uruchomienie pięciu wątków i czekanie tylko w najszybszym wątku. Wszystkie pięć wątków poszło szukać tych samych danych w 5 kierunkach, a jedno wystarczy, aby kontynuować przepływ sterowania.Jak poczekać, aż zakończy się tylko pierwszy wątek w Pythonie

Właściwie, muszę poczekać na pierwsze dwa wątki, aby powrócić, aby zweryfikować się nawzajem. Ale myślę, że wiem, jak czekać na najszybszy. Mogę wymyślić, jak czekać na drugą najszybszą.

Dużo mówić o join(timeout), ale nie wiesz z góry, który z nich należy poczekać (z którego należy wcześniej zastosować join).

Odpowiedz

1

Możesz użyć zdarzenia dla tego. Zobacz: http://docs.python.org/2/library/threading.html#event-objects Chodzi o to, że wątki robocze wywołują zdarzenie po ich zakończeniu. Główny wątek czeka na to wydarzenie przed kontynuowaniem. W wątku roboczym można ustawić zmienną (muteksowaną), aby identyfikować się ze zdarzeniem.

1

Albo po prostu śledź wszystkie ukończone wątki na liście i pozwól drugiemu wątkowi zakończyć obsługę wszystkiego, co ma być zrobione, listy Pythona są chronione wątkami.

finished_threads = [] 
event = threading.Event() 

def func(): 
    do_important_stuff() 

    thisthread = threading.current_thread() 
    finished_threads.append(thisthread) 
    if len(finished_threads) > 1 and finished_threads[1] == thisthread: 
     #yay we are number two! 
     event.set() 

for i in range(5): 
    threading.Thread(target=func).start() 

event.wait() 
+0

Nie odpowiada to bitowi o wątku głównym, który czeka aż do ukończenia dwóch wątków, a następnie kontynuuje: zamiast tego przeniesiono całą pozostałą czynność na drugi wątek, aby zakończyć, co może nie być tym, co jest potrzebne. – Duncan

+1

prawda; handle_two_threads_done() powinien raczej ustawić na zdarzeniu. Edytowane. –

+0

Ummm, listy Pythona to wątki bezpieczne? Naprawdę? Myślałem, że trzeba użyć Queue() dla spójności wątku! –

3

Jeśli masz jakieś pętli przetwarzania w Twoich wątków, następujący kod będzie je zakończyć, gdy jedna kończy się za pomocą threading.Event():

def my_thread(stop_event): 
    while not stop_event.is_set(): 
     # do stuff in a loop 

     # some check if stuff is complete 
     if stuff_complete: 
      stop_event.set() 
      break 

def run_threads(): 
    # create a thread event 
    a_stop_event = threading.Event() 

    # spawn the threads 
    for x in range(5): 
     t = threading.Thread(target=my_thread, args=[a_stop_event]) 
     t.start() 

    while not a_stop_event.is_set(): 
     # wait for an event 
     time.sleep(0.1) 

    print "At least one thread is done" 

Jeśli proces jest „tani” lub pojedynczy wątek typu żądanie-odpowiedź (np. na przykład asynchroniczne żądanie HTTP), a następnie Duncan's answer jest dobrym podejściem.

13

Użyj kolejki: każdy wątek po ukończeniu umieszcza wynik w kolejce, a potem po prostu trzeba przeczytać odpowiednią liczbę wyników i zignorować resztę:

#!python3.3 
import queue # For Python 2.x use 'import Queue as queue' 
import threading, time, random 

def func(id, result_queue): 
    print("Thread", id) 
    time.sleep(random.random() * 5) 
    result_queue.put((id, 'done')) 

def main(): 
    q = queue.Queue() 
    threads = [ threading.Thread(target=func, args=(i, q)) for i in range(5) ] 
    for th in threads: 
     th.daemon = True 
     th.start() 

    result1 = q.get() 
    result2 = q.get() 

    print("Second result: {}".format(result2)) 

if __name__=='__main__': 
    main() 

Dokumentacja dla Queue.get() (bez argumentów to odpowiada Queue.get(True, None):

Queue.get ([blok [, czas oczekiwania]])

Usunąć i powrót elementu z kolejka. Jeśli opcjonalny blok args ma wartość true, a limit czasu to None (domyślna wartość to ), w razie potrzeby blokuj, aż element będzie dostępny. Jeśli limit czasu to dodatnia liczba, blokuje ona najwyżej limit czasu i podnosi Pusty wyjątek, jeśli żaden element nie był dostępny w tym czasie. W przeciwnym razie (blok jest fałszywy), zwróć element, jeśli jest on natychmiast dostępny, inaczej podnieś Pusty wyjątek (w takim przypadku limit czasu zostanie zignorowany).

+1

Czy to nie podniesie wyjątku 'Empty', jeśli kolejka jest pusta, gdy wykonujesz' q.get() '? – Michael

+2

@Michael, domyślną wartością 'q.get()' jest uzyskanie dostępu blokującego, więc nie spowoduje to odrzucenia wyjątku, zamiast tego zablokuje główny wątek, dopóki nie będzie dostępny wynik. – Duncan

1

Metoda Duncana jest prawdopodobnie najlepsza i jest to, co polecam. Jestem lekko zirytowany brakiem "czekania na ukończony wątek do ukończenia" wcześniej, więc napisałem to, żeby go wypróbować. Wydaje się działać. Po prostu użyj MWThread zamiast threading.thread, a otrzymasz nową funkcję wait_for_thread.

Zmienne globalne są nieco nieostre; alternatywą byłoby uczynienie z nich zmiennych na poziomie klasy. Ale jeśli jest to ukryte w module (mwthread.py lub cokolwiek innego), powinno być dobrze w każdym przypadku.

#! /usr/bin/env python 

# Example of how to "wait for"/join whichever threads is/are done, 
# in (more or less) the order they're done. 

import threading 
from collections import deque 

_monitored_threads = [] 
_exited_threads = deque() 
_lock = threading.Lock() 
_cond = threading.Condition(_lock) 

class MWThread(threading.Thread): 
    """ 
    multi-wait-able thread, or monitored-wait-able thread 
    """ 
    def run(self): 
     tid = threading.current_thread() 
     try: 
      with _lock: 
       _monitored_threads.append(tid) 
      super(MWThread, self).run() 
     finally: 
      with _lock: 
       _monitored_threads.remove(tid) 
       _exited_threads.append(tid) 
       _cond.notifyAll() 

def wait_for_thread(timeout=None): 
    """ 
    Wait for some thread(s) to have finished, with optional 
    timeout. Return the first finished thread instance (which 
    is removed from the finished-threads queue). 

    If there are no unfinished threads this returns None 
    without waiting. 
    """ 
    with _cond: 
     if not _exited_threads and _monitored_threads: 
      _cond.wait(timeout) 
     if _exited_threads: 
      result = _exited_threads.popleft() 
     else: 
      result = None 
    return result 

def main(): 
    print 'testing this stuff' 
    def func(i): 
     import time, random 
     sleeptime = (random.random() * 2) + 1 
     print 'thread', i, 'starting - sleep for', sleeptime 
     time.sleep(sleeptime) 
     print 'thread', i, 'finished' 

    threads = [MWThread(target=func, args=(i,)) for i in range(3)] 
    for th in threads: 
     th.start() 
    i = 0 
    while i < 3: 
     print 'main: wait up to .5 sec' 
     th = wait_for_thread(.5) 
     if th: 
      print 'main: got', th 
      th.join() 
      i += 1 
     else: 
      print 'main: timeout' 
    print 'I think I collected them all' 
    print 'result of wait_for_thread():' 
    print wait_for_thread() 

if __name__ == '__main__': 
    main() 
Powiązane problemy