2009-07-14 13 views

Odpowiedz

14

Wygląda na to, że nie ma jeszcze oficjalnego sposobu na poradzenie sobie z tym problemem. A przynajmniej nie na podstawie tego:

można spróbować coś takiego, co robi ten post - dostępu do podstawowych uchwytów plików rurze:

, a następnie użyj wybierz.

+0

+1 Wow, ładne znaleziska! Moje Google-fu wydaje się słabe ... – cdleary

+1

Niestety drugi adres URL już nie działa –

1

Można użyć wzoru podobnego do Observer, w którym subskrybenci kolejki są powiadamiani o zmianach stanu.

W tym przypadku można wyznaczyć wątek roboczy jako detektor w każdej kolejce, a gdy tylko otrzyma gotowy sygnał, może pracować na nowym elemencie, w przeciwnym razie zasnąć.

+0

Cóż, 'get' jest destrukcyjny, więc nie można tak naprawdę obserwować samej kolejki, jak opisuje ją GoF. Nić do zaklinowania musiałaby być "obserwowana" - miałem nadzieję na mniejszy napływ niż dwa dodatkowe wątki. – cdleary

+0

Ponadto, gdybym chciał mieć jeden punkt dostępu dla procesu wywołującego (jak w 'select'), potrzebowałbym kolejki wątkowo bezpiecznej na górze tych dwóch wątków. – cdleary

2

Wygląda na to, że używa się wątków, które przesyłają przychodzące elementy do pojedynczej kolejki, na którą czekasz, to praktyczny wybór w przypadku korzystania z procesu wieloprocesorowego w sposób niezależny od platformy.

Unikanie gwintów wymaga obsługi niskopoziomowych rur/FD, które są zarówno specyficzne dla platformy, jak i niełatwe w obsłudze zgodnie z interfejsem API wyższego poziomu.

Albo potrzebujesz Kolejki z możliwością ustawienia wywołań zwrotnych, które moim zdaniem są odpowiednim interfejsem wyższego poziomu. To znaczy. napiszesz coś takiego:

 
    singlequeue = Queue() 
    incoming_queue1.setcallback(singlequeue.put) 
    incoming_queue2.setcallback(singlequeue.put) 
    ... 
    singlequeue.get() 

Być może pakiet wieloprocesowy może rozwinąć ten interfejs API, ale jeszcze go tam nie ma. Ta koncepcja działa dobrze z py.execnet, która używa terminu "kanał" zamiast "kolejek", zobacz tutaj: http://tinyurl.com/nmtr4w

+0

To byłby bardzo fajny interfejs! (Chociaż wyraźnie istnieje korzyść z utrzymywania ścisłych interfejsów stdlib, jak wspomina Jesse w raporcie błędów w odnośniku @ars). – cdleary

+0

prawda, ale obecny publiczny interfejs API Kolejki nie obsługuje twojego przypadku użycia, który uważam za typowy. – hpk42

+0

Jeśli jest to "częste" - zgłoś błąd + łatkę (z testami na miłość do Pete) na bugs.python.org i mogę ją ocenić dla wersji 2.7/3.x – jnoller

21

Właściwie możesz używać obiektów wieloprocesorowych.Queue w select.select. tj.

que = multiprocessing.Queue() 
(input,[],[]) = select.select([que._reader],[],[]) 

wybrałby tylko que, jeśli jest gotowa do odczytu.

Brak dokumentacji na ten temat. Czytałem kod źródłowy biblioteki multiprocessing.queue (na Linux-ie to zwykle coś takiego jak /usr/lib/python2.6/multiprocessing/queue.py), aby to znaleźć.

Z Queue.Queue Nie znalazłem żadnego sprytnego sposobu na zrobienie tego (i naprawdę bym chciał).

+1

nie działa pod Windowsem. – fluke

+3

Działa to świetnie na systemie Unix, ale w systemie Windows implementacja 'select.select' może zajmować się tylko gniazdami, a nie deskryptorami plików, a zatem to się nie udaje. –

+0

Jaka jest główna różnica między 'Queue.Queue' a' multiprocessing.Queue' i czy 'multiprocessing.Queue' może być używany do wielowątkowości, a nie tylko do wieloprocesowości? – CMCDragonkai

1

Nie wiem, jak dobrze zaznaczenie w wieloprocesowej kolejce działa w systemie Windows. Ponieważ wybieranie w systemie Windows nasłuchuje gniazd, a nie uchwytów plików, podejrzewam, że mogą wystąpić problemy.

Moja odpowiedź brzmi: utworzyć wątek do odsłuchiwania każdej kolejki w sposób blokujący i umieścić wyniki w jednej kolejce odsłuchiwanej przez główny wątek, zasadniczo multipleksując poszczególne koleje w jeden.

Mój kod robi to:

""" 
Allow multiple queues to be waited upon. 

queue,value = multiq.select(list_of_queues) 
""" 
import queue 
import threading 

class queue_reader(threading.Thread): 
    def __init__(self,inq,sharedq): 
     threading.Thread.__init__(self) 
     self.inq = inq 
     self.sharedq = sharedq 
    def run(self): 
     while True: 
      data = self.inq.get() 
      print ("thread reads data=",data) 
      result = (self.inq,data) 
      self.sharedq.put(result) 

class multi_queue(queue.Queue): 
    def __init__(self,list_of_queues): 
     queue.Queue.__init__(self) 
     for q in list_of_queues: 
      qr = queue_reader(q,self) 
      qr.start() 

def select(list_of_queues): 
    outq = queue.Queue() 
    for q in list_of_queues: 
     qr = queue_reader(q,outq) 
     qr.start() 
    return outq.get() 

Poniższy rutynowe badanie pokazuje, w jaki sposób z niego korzystać:

import multiq 
import queue 

q1 = queue.Queue() 
q2 = queue.Queue() 

q3 = multiq.multi_queue([q1,q2]) 

q1.put(1) 
q2.put(2) 
q1.put(3) 
q1.put(4) 

res=0 
while not res==4: 
    while not q3.empty(): 
     res = q3.get()[1] 
     print ("returning result =",res) 

nadzieję, że to pomaga.

Tony Wallace

1

Nowa wersja powyższego kodu ...

Nie wiem, jak dobrze select w kolejce wieloprocesorowe działa w systemie Windows. Ponieważ wybieranie w systemie Windows nasłuchuje gniazd, a nie uchwytów plików, podejrzewam, że mogą wystąpić problemy.

Moja odpowiedź brzmi: utworzyć wątek do odsłuchiwania każdej kolejki w sposób blokujący i umieścić wyniki w jednej kolejce odsłuchiwanej przez główny wątek, zasadniczo multipleksując poszczególne koleje w jeden.

Mój kod robi to:

""" 
Allow multiple queues to be waited upon. 

An EndOfQueueMarker marks a queue as 
    "all data sent on this queue". 
When this marker has been accessed on 
all input threads, this marker is returned 
by the multi_queue. 

""" 
import queue 
import threading 

class EndOfQueueMarker: 
    def __str___(self): 
     return "End of data marker" 
    pass 

class queue_reader(threading.Thread): 
    def __init__(self,inq,sharedq): 
     threading.Thread.__init__(self) 
     self.inq = inq 
     self.sharedq = sharedq 
    def run(self): 
     q_run = True 
     while q_run: 
      data = self.inq.get() 
      result = (self.inq,data) 
      self.sharedq.put(result) 
      if data is EndOfQueueMarker: 
       q_run = False 

class multi_queue(queue.Queue): 
    def __init__(self,list_of_queues): 
     queue.Queue.__init__(self) 
     self.qList = list_of_queues 
     self.qrList = [] 
     for q in list_of_queues: 
      qr = queue_reader(q,self) 
      qr.start() 
      self.qrList.append(qr) 
    def get(self,blocking=True,timeout=None): 
     res = [] 
     while len(res)==0: 
      if len(self.qList)==0: 
       res = (self,EndOfQueueMarker) 
      else: 
       res = queue.Queue.get(self,blocking,timeout) 
       if res[1] is EndOfQueueMarker: 
        self.qList.remove(res[0]) 
        res = [] 
     return res 

    def join(self): 
     for qr in self.qrList: 
      qr.join() 

def select(list_of_queues): 
    outq = queue.Queue() 
    for q in list_of_queues: 
     qr = queue_reader(q,outq) 
     qr.start() 
    return outq.get() 

Kod naśladowania jest moja próba rutyna, aby pokazać jak to działa:

import multiq 
import queue 

q1 = queue.Queue() 
q2 = queue.Queue() 

q3 = multiq.multi_queue([q1,q2]) 

q1.put(1) 
q2.put(2) 
q1.put(3) 
q1.put(4) 
q1.put(multiq.EndOfQueueMarker) 
q2.put(multiq.EndOfQueueMarker) 
res=0 
have_data = True 
while have_data: 
    res = q3.get()[1] 
    print ("returning result =",res) 
    have_data = not(res==multiq.EndOfQueueMarker) 
-2

nie rób tego.

Umieść nagłówek wiadomości i wyślij je do wspólnej kolejki. Upraszcza to kod i będzie ogólnie czystszy.

Powiązane problemy