2012-07-02 15 views
7

Muszę wiedzieć, kiedy kolejka jest zamknięta i nie będzie więcej elementów, więc mogę zakończyć iterację.Kolejka iteracyjna w języku Python

Zrobiłem to poprzez umieszczenie Sentinel w kolejce:

from Queue import Queue 

class IterableQueue(Queue): 

    _sentinel = object() 

    def __iter__(self): 
     return self 

    def close(self): 
     self.put(self._sentinel) 

    def next(self): 
     item = self.get() 
     if item is self._sentinel: 
      raise StopIteration 
     else: 
      return item 

Biorąc pod uwagę, że jest to bardzo powszechne wykorzystanie do kolejki, nie istnieje wbudowane realizacja?

+0

mogę użyć Sentinel lub flagę w wątku, aby zatrzymać iteracji nad kolejce. Na później zwykle czekam z limitem czasu. – jdi

Odpowiedz

10

wartownik jest rozsądnym sposobem dla producentów, aby wysłać wiadomość, że nie ma więcej zadań kolejki są w przygotowaniu.

FWIW, kod może zostać uproszczony trochę z postaci dwóch argument iter():

from Queue import Queue 

class IterableQueue(Queue): 

    _sentinel = object() 

    def __iter__(self): 
     return iter(self.get, self._sentinel) 

    def close(self): 
     self.put(self._sentinel) 
4

Moduł wieloprocesowy ma własną wersję Queue, która zawiera metodę close. Nie jestem pewien, jak to działa w wątkach, ale warto spróbować. Nie widzę, dlaczego nie powinny działać tak samo:

from multiprocessing import Queue 

q = Queue() 
q.put(1) 
q.get_nowait() 
# 1 
q.close() 
q.get_nowait() 
# ... 
# IOError: handle out of range in select() 

Można po prostu złapać IOError jako bliskiego sygnału.

TEST

from multiprocessing import Queue 
from threading import Thread 

def worker(q): 
    while True: 
     try: 
      item = q.get(timeout=.5) 
     except IOError: 
      print "Queue closed. Exiting thread." 
      return 
     except: 
      continue 
     print "Got item:", item 

q = Queue() 
for i in xrange(3): 
    q.put(i) 
t = Thread(target=worker, args=(q,)) 
t.start() 
# Got item: 0 
# Got item: 1 
# Got item: 2 
q.close() 
# Queue closed. Exiting thread. 

Choć szczerze mówiąc, to nie jest zbyt dużo inny niż ustawienie flagi na Queue.Queue. Multiprocessing.Queue jest tylko przy użyciu zamkniętego deskryptor jako flaga:

from Queue import Queue 

def worker2(q): 
    while True: 
     if q.closed: 
      print "Queue closed. Exiting thread." 
      return 
     try: 
      item = q.get(timeout=.5) 
     except: 
      continue 
     print "Got item:", item 

q = Queue() 
q.closed = False 
for i in xrange(3): 
    q.put(i) 
t = Thread(target=worker2, args=(q,)) 
t.start() 
# Got item: 0 
# Got item: 1 
# Got item: 2 
q.closed = True 
# Queue closed. Exiting thread.