Jaki jest najlepszy sposób oczekiwania (bez obracania), dopóki coś nie będzie dostępne w jednym z dwóch (wieloprocesowych) Queues, gdzie oba będą znajdować się w tym samym systemie?"wybierz" w wielu kolejkach wieloprocesorowych Pythona?
Odpowiedz
Wygląda na to, że nie ma jeszcze oficjalnego sposobu na poradzenie sobie z tym problemem. A przynajmniej nie na podstawie tego:
można spróbować coś takiego, co robi ten post - dostępu do podstawowych uchwytów plików rurze:
, a następnie użyj wybierz.
Można użyć wzoru podobnego do Observer, w którym subskrybenci kolejki są powiadamiani o zmianach stanu.
W tym przypadku można wyznaczyć wątek roboczy jako detektor w każdej kolejce, a gdy tylko otrzyma gotowy sygnał, może pracować na nowym elemencie, w przeciwnym razie zasnąć.
Cóż, 'get' jest destrukcyjny, więc nie można tak naprawdę obserwować samej kolejki, jak opisuje ją GoF. Nić do zaklinowania musiałaby być "obserwowana" - miałem nadzieję na mniejszy napływ niż dwa dodatkowe wątki. – cdleary
Ponadto, gdybym chciał mieć jeden punkt dostępu dla procesu wywołującego (jak w 'select'), potrzebowałbym kolejki wątkowo bezpiecznej na górze tych dwóch wątków. – cdleary
Wygląda na to, że używa się wątków, które przesyłają przychodzące elementy do pojedynczej kolejki, na którą czekasz, to praktyczny wybór w przypadku korzystania z procesu wieloprocesorowego w sposób niezależny od platformy.
Unikanie gwintów wymaga obsługi niskopoziomowych rur/FD, które są zarówno specyficzne dla platformy, jak i niełatwe w obsłudze zgodnie z interfejsem API wyższego poziomu.
Albo potrzebujesz Kolejki z możliwością ustawienia wywołań zwrotnych, które moim zdaniem są odpowiednim interfejsem wyższego poziomu. To znaczy. napiszesz coś takiego:
singlequeue = Queue() incoming_queue1.setcallback(singlequeue.put) incoming_queue2.setcallback(singlequeue.put) ... singlequeue.get()
Być może pakiet wieloprocesowy może rozwinąć ten interfejs API, ale jeszcze go tam nie ma. Ta koncepcja działa dobrze z py.execnet, która używa terminu "kanał" zamiast "kolejek", zobacz tutaj: http://tinyurl.com/nmtr4w
To byłby bardzo fajny interfejs! (Chociaż wyraźnie istnieje korzyść z utrzymywania ścisłych interfejsów stdlib, jak wspomina Jesse w raporcie błędów w odnośniku @ars). – cdleary
prawda, ale obecny publiczny interfejs API Kolejki nie obsługuje twojego przypadku użycia, który uważam za typowy. – hpk42
Jeśli jest to "częste" - zgłoś błąd + łatkę (z testami na miłość do Pete) na bugs.python.org i mogę ją ocenić dla wersji 2.7/3.x – jnoller
Właściwie możesz używać obiektów wieloprocesorowych.Queue w select.select. tj.
que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])
wybrałby tylko que, jeśli jest gotowa do odczytu.
Brak dokumentacji na ten temat. Czytałem kod źródłowy biblioteki multiprocessing.queue (na Linux-ie to zwykle coś takiego jak /usr/lib/python2.6/multiprocessing/queue.py), aby to znaleźć.
Z Queue.Queue Nie znalazłem żadnego sprytnego sposobu na zrobienie tego (i naprawdę bym chciał).
nie działa pod Windowsem. – fluke
Działa to świetnie na systemie Unix, ale w systemie Windows implementacja 'select.select' może zajmować się tylko gniazdami, a nie deskryptorami plików, a zatem to się nie udaje. –
Jaka jest główna różnica między 'Queue.Queue' a' multiprocessing.Queue' i czy 'multiprocessing.Queue' może być używany do wielowątkowości, a nie tylko do wieloprocesowości? – CMCDragonkai
Nie wiem, jak dobrze zaznaczenie w wieloprocesowej kolejce działa w systemie Windows. Ponieważ wybieranie w systemie Windows nasłuchuje gniazd, a nie uchwytów plików, podejrzewam, że mogą wystąpić problemy.
Moja odpowiedź brzmi: utworzyć wątek do odsłuchiwania każdej kolejki w sposób blokujący i umieścić wyniki w jednej kolejce odsłuchiwanej przez główny wątek, zasadniczo multipleksując poszczególne koleje w jeden.
Mój kod robi to:
"""
Allow multiple queues to be waited upon.
queue,value = multiq.select(list_of_queues)
"""
import queue
import threading
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
while True:
data = self.inq.get()
print ("thread reads data=",data)
result = (self.inq,data)
self.sharedq.put(result)
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
Poniższy rutynowe badanie pokazuje, w jaki sposób z niego korzystać:
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
res=0
while not res==4:
while not q3.empty():
res = q3.get()[1]
print ("returning result =",res)
nadzieję, że to pomaga.
Tony Wallace
Nowa wersja powyższego kodu ...
Nie wiem, jak dobrze select w kolejce wieloprocesorowe działa w systemie Windows. Ponieważ wybieranie w systemie Windows nasłuchuje gniazd, a nie uchwytów plików, podejrzewam, że mogą wystąpić problemy.
Moja odpowiedź brzmi: utworzyć wątek do odsłuchiwania każdej kolejki w sposób blokujący i umieścić wyniki w jednej kolejce odsłuchiwanej przez główny wątek, zasadniczo multipleksując poszczególne koleje w jeden.
Mój kod robi to:
"""
Allow multiple queues to be waited upon.
An EndOfQueueMarker marks a queue as
"all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.
"""
import queue
import threading
class EndOfQueueMarker:
def __str___(self):
return "End of data marker"
pass
class queue_reader(threading.Thread):
def __init__(self,inq,sharedq):
threading.Thread.__init__(self)
self.inq = inq
self.sharedq = sharedq
def run(self):
q_run = True
while q_run:
data = self.inq.get()
result = (self.inq,data)
self.sharedq.put(result)
if data is EndOfQueueMarker:
q_run = False
class multi_queue(queue.Queue):
def __init__(self,list_of_queues):
queue.Queue.__init__(self)
self.qList = list_of_queues
self.qrList = []
for q in list_of_queues:
qr = queue_reader(q,self)
qr.start()
self.qrList.append(qr)
def get(self,blocking=True,timeout=None):
res = []
while len(res)==0:
if len(self.qList)==0:
res = (self,EndOfQueueMarker)
else:
res = queue.Queue.get(self,blocking,timeout)
if res[1] is EndOfQueueMarker:
self.qList.remove(res[0])
res = []
return res
def join(self):
for qr in self.qrList:
qr.join()
def select(list_of_queues):
outq = queue.Queue()
for q in list_of_queues:
qr = queue_reader(q,outq)
qr.start()
return outq.get()
Kod naśladowania jest moja próba rutyna, aby pokazać jak to działa:
import multiq
import queue
q1 = queue.Queue()
q2 = queue.Queue()
q3 = multiq.multi_queue([q1,q2])
q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
res = q3.get()[1]
print ("returning result =",res)
have_data = not(res==multiq.EndOfQueueMarker)
nie rób tego.
Umieść nagłówek wiadomości i wyślij je do wspólnej kolejki. Upraszcza to kod i będzie ogólnie czystszy.
- 1. Używanie potoków wieloprocesorowych w pytonie
- 2. Jak obsługiwać współbieżność bazy danych wieloprocesorowych Pythona, w szczególności z django?
- 3. Wykrywanie upuszczonych wiadomości w kolejkach ZeroMQ
- 4. Jak synchronizować zadania w różnych kolejkach wysyłkowych?
- 5. wybierz kombinacje z wielu list
- 6. SQL WYBIERZ z wielu tabel
- 7. Codeigniter: Wybierz z wielu tabel
- 8. jQuery UI wielu wybierz przeciągnij
- 9. SmartGit - Wybierz wszystkie pliki w wielu zatwierdzeniach
- 10. WYBIERZ COUNT (DISTINCT ...) błąd w wielu kolumnach?
- 11. Jednoczesne uruchamianie wielu skryptów Pythona
- 12. Obsługa wielu wersji Pythona w twoim kodzie?
- 13. Linq do Encji od wielu do wielu wybierz zapytanie
- 14. Linq. Wybierz jedną z wielu tabel
- 15. Wybierz element na podstawie wielu klas
- 16. Wybierz z wielu tabel Przecinek oddzielony
- 17. Najlepsza praktyka przetwarzania wielowątkowych wiadomości w kolejkach JMS
- 18. Tworzenie i niszczenie klas AVFoundation w kolejkach tła?
- 19. Pandy Pythona, opcje drukowania dla wielu linii
- 20. Moduł Pythona do wielu zmiennych globalnej optymalizacji
- 21. Wybierz katalog dla wprowadzania wielu plików HTML5 w przeglądarce Firefox?
- 22. równoczesne uruchamianie wielu instancji skryptów Pythona
- 23. Scalanie wielu wartości kolumn w jednej kolumnie w pand Pythona
- 24. przesyłanie wielu plików w jednym żądaniu użyciu żądań Pythona moduł
- 25. Użyj wielu ograniczników znaków w Pandonie Pythona to_csv
- 26. Krótki kod Pythona, aby powiedzieć "Wybierz niższą wartość"?
- 27. MYSQL - WYBIERZ z wielu wierszy, ten sam użytkownik, różne wartości
- 28. Wybierz podzbiór wierszy ramek danych przy użyciu wielu warunków.
- 29. Pokaż/Ukryj wielu div z wybierz za pomocą jQuery
- 30. JAVA ustaw/wybierz określoną kartę sieciową z wielu (UDP)
+1 Wow, ładne znaleziska! Moje Google-fu wydaje się słabe ... – cdleary
Niestety drugi adres URL już nie działa –