Tak więc mam system z producentem i konsumentem są połączone przez kolejkę o nieograniczonym rozmiarze, ale jeśli konsument wielokrotnie wywołuje get, dopóki nie zostanie zgłoszony Pusty wyjątek, nie kasuje kolejki .Zmiana wielkości bufora w trybie wieloprocesowym.Queue
Wierzę, że dzieje się tak dlatego, że wątek w kolejce po stronie konsumenta, który serializuje obiekty w gnieździe, zostaje zablokowany po zapełnieniu bufora gniazda, a więc czeka aż bufor ma miejsce, jednak jest to możliwe dla konsumenta wywołanie get "zbyt szybko" i dlatego uważa, że kolejka jest pusta, podczas gdy w rzeczywistości wątek po drugiej stronie ma znacznie więcej danych do wysłania, ale nie może serializować go wystarczająco szybko, aby zapobiec wyświetlaniu pustego gniazda dla konsumenta .
Wierzę, że ten problem zostałby złagodzony, gdybym mógł zmienić rozmiar bufora na odpowiednim gnieździe (jestem oparty na systemie Windows). O ile mogę zobaczyć, co muszę zrobić, wtedy jest coś takiego:
import multiprocessing.connections as conns
conns.BUFSIZE = 2 ** 16 # is typically set as 2 ** 13 for windows
import multiprocessing.Queue as q
Jeśli robię powyższego, to znaczy, że gdy multirprocssing Zainicjowanie kolejce będzie korzystał z nowego rozmiaru bufora, który mam ustawiony w wersja wieloprocesowych połączeń, które już zaimportowałem? Czy to jest poprawne?
Sądzę również, że wpłynie to tylko na okna, ponieważ funkcja BUFSIZE nie jest używana na komputerach z systemem Linux, ponieważ ich wszystkie gniazda są domyślnie ustawione na 60 kilobajtów?
Czy ktoś próbował już wcześniej? Czy miałoby to skutki uboczne w oknach? A jakie są podstawowe ograniczenia rozmiarów buforów gniazd w oknach?
=================== Próbkę demonstrujący ===================
# import multiprocessing.connection as conn
# conn.BUFSIZE = 2 ** 19
import sys
import multiprocessing as mp
from Queue import Empty
from time import sleep
total_length = 10**8
def supplier(q):
print "Starting feeder"
for i in range(total_length) :
q.put(i)
if __name__=="__main__":
queue = mp.Queue()
p = mp.Process(target=supplier, args=(queue,))
p.start()
sleep(120)
returned = []
while True :
try :
returned.append(queue.get(block=False))
except Empty :
break
print len(returned)
print len(returned) == total_length
p.terminate()
sys.exit()
Ta próbka, po uruchomieniu na oknach, zwykle usuwa tylko około 160 000 pozycji z kolejki, ponieważ główny wątek może opróżnić bufor szybciej, niż jest on uzupełniany przez dostawcę, i ostatecznie próbuje wyciągnąć z kolejki, gdy bufor jest pusty i informuje, że jest pusty.
Teoretycznie można poprawić ten problem dzięki większemu rozmiarowi bufora. Dwie linie u góry będą, jak sądzę, w systemie Windows, zwiększą domyślny rozmiar bufora dla potoku.
Jeśli je skomentujesz, ten skrypt pobierze więcej danych, zanim zostanie zakończony, ponieważ ma znacznie więcej. Moje główne pytania to: 1) Czy to rzeczywiście działa. 2) Czy istnieje sposób, aby ten kod użył tego samego rozmiaru bufora bazowego w systemie Windows i Linuksie 3) Czy są jakieś nieoczekiwane efekty uboczne przy ustawianiu dużych rozmiarów buforów dla rur.
Jestem świadomy, że generalnie nie ma sposobu, aby dowiedzieć się, czy wyciągnął wszystkie dane z kolejki (- biorąc pod uwagę, że dostawca działa stale i produkuje dane bardzo nierównomiernie), ale szukam sposobów, aby popraw to na zasadzie najlepszego wysiłku.
Myślę, że mały przykład demonstrujący ten problem byłby przydatny. – pradyunsg