2009-10-14 14 views
12

Chcę móc dołączyć() do klasy Queue, ale po jakimś czasie limit czasu, jeśli połączenie jeszcze nie wróciło. Jaki jest najlepszy sposób na zrobienie tego? Czy można to zrobić poprzez podklasowanie kolejki \ przy użyciu metaclass?Dodaj argument timeout do kolejki Pythona.join()

+1

by upewnić się, że wszystko skończy wątek pracownik task_done() – tuergeist

Odpowiedz

17

Utworzenie podklasy Queue to prawdopodobnie najlepszy sposób. Coś jak to powinno działać (niesprawdzone):

def join_with_timeout(self, timeout): 
    self.all_tasks_done.acquire() 
    try: 
     endtime = time() + timeout 
     while self.unfinished_tasks: 
      remaining = endtime - time() 
      if remaining <= 0.0: 
       raise NotFinished 
      self.all_tasks_done.wait(remaining) 
    finally: 
     self.all_tasks_done.release() 
+1

Dzięki! Skąd masz informacje o all_task_done? Zajrzałem do http://docs.python.org/library/queue.html#module-Queue, ale nie widzę żadnej wzmianki o tym memu ... – olamundo

+3

Możesz przeczytać kod źródłowy Queue. Ma parametr 'timeout' wprowadzony dla' put' i 'get', łatwo było rozszerzyć' join', aby zastosować podobne podejście. –

+0

Hmm, inteligentne rozwiązanie;) – tuergeist

0

Najpierw należy upewnić się, że wszystkie wątki pracujące w wyjściu kolejki z task_done()

Aby wdrożyć funkcję limitu czasu z Queue można owinąć kod kolejce za w wątku i dodać czas oczekiwania na ten temat za pomocą Thread.join([timeout])

nietestowanego przykład zarys tego, co proponuję

def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

def queuefunc(): 
    q = Queue() 
    for i in range(num_worker_threads): 
     t = Thread(target=worker) 
     t.setDaemon(True) 
     t.start() 

    for item in source(): 
     q.put(item) 

    q.join()  # block until all tasks are done 

t = Thread(target=queuefunc) 
t.start() 
t.join(100) # timeout applies here 
10

join() metoda jest o czekając na wszystkich zadań do zrobienia. Jeśli nie obchodzi, czy zadania zostały faktycznie zakończone, można okresowo sondować niedokończone liczyć zadanie:

stop = time() + timeout 
while q.unfinished_tasks and time() < stop: 
    sleep(1) 

Pętla ta będzie istnieć albo gdy zadania są wykonywane lub gdy upłynie limit czasu.

Raymond