Moduł wieloprocesowy ma własną wersję Queue, która zawiera metodę close
. Nie jestem pewien, jak to działa w wątkach, ale warto spróbować. Nie widzę, dlaczego nie powinny działać tak samo:
from multiprocessing import Queue
q = Queue()
q.put(1)
q.get_nowait()
# 1
q.close()
q.get_nowait()
# ...
# IOError: handle out of range in select()
Można po prostu złapać IOError jako bliskiego sygnału.
TEST
from multiprocessing import Queue
from threading import Thread
def worker(q):
while True:
try:
item = q.get(timeout=.5)
except IOError:
print "Queue closed. Exiting thread."
return
except:
continue
print "Got item:", item
q = Queue()
for i in xrange(3):
q.put(i)
t = Thread(target=worker, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.close()
# Queue closed. Exiting thread.
Choć szczerze mówiąc, to nie jest zbyt dużo inny niż ustawienie flagi na Queue.Queue. Multiprocessing.Queue jest tylko przy użyciu zamkniętego deskryptor jako flaga:
from Queue import Queue
def worker2(q):
while True:
if q.closed:
print "Queue closed. Exiting thread."
return
try:
item = q.get(timeout=.5)
except:
continue
print "Got item:", item
q = Queue()
q.closed = False
for i in xrange(3):
q.put(i)
t = Thread(target=worker2, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.closed = True
# Queue closed. Exiting thread.
mogę użyć Sentinel lub flagę w wątku, aby zatrzymać iteracji nad kolejce. Na później zwykle czekam z limitem czasu. – jdi