2016-11-29 20 views
8

Tak więc mam system z producentem i konsumentem są połączone przez kolejkę o nieograniczonym rozmiarze, ale jeśli konsument wielokrotnie wywołuje get, dopóki nie zostanie zgłoszony Pusty wyjątek, nie kasuje kolejki .Zmiana wielkości bufora w trybie wieloprocesowym.Queue

Wierzę, że dzieje się tak dlatego, że wątek w kolejce po stronie konsumenta, który serializuje obiekty w gnieździe, zostaje zablokowany po zapełnieniu bufora gniazda, a więc czeka aż bufor ma miejsce, jednak jest to możliwe dla konsumenta wywołanie get "zbyt szybko" i dlatego uważa, że ​​kolejka jest pusta, podczas gdy w rzeczywistości wątek po drugiej stronie ma znacznie więcej danych do wysłania, ale nie może serializować go wystarczająco szybko, aby zapobiec wyświetlaniu pustego gniazda dla konsumenta .

Wierzę, że ten problem zostałby złagodzony, gdybym mógł zmienić rozmiar bufora na odpowiednim gnieździe (jestem oparty na systemie Windows). O ile mogę zobaczyć, co muszę zrobić, wtedy jest coś takiego:

import multiprocessing.connections as conns 
conns.BUFSIZE = 2 ** 16 # is typically set as 2 ** 13 for windows 
import multiprocessing.Queue as q 

Jeśli robię powyższego, to znaczy, że gdy multirprocssing Zainicjowanie kolejce będzie korzystał z nowego rozmiaru bufora, który mam ustawiony w wersja wieloprocesowych połączeń, które już zaimportowałem? Czy to jest poprawne?

Sądzę również, że wpłynie to tylko na okna, ponieważ funkcja BUFSIZE nie jest używana na komputerach z systemem Linux, ponieważ ich wszystkie gniazda są domyślnie ustawione na 60 kilobajtów?

Czy ktoś próbował już wcześniej? Czy miałoby to skutki uboczne w oknach? A jakie są podstawowe ograniczenia rozmiarów buforów gniazd w oknach?

=================== Próbkę demonstrujący ===================

# import multiprocessing.connection as conn 
# conn.BUFSIZE = 2 ** 19 
import sys 
import multiprocessing as mp 
from Queue import Empty 
from time import sleep 

total_length = 10**8 

def supplier(q): 
    print "Starting feeder" 
    for i in range(total_length) : 
     q.put(i) 


if __name__=="__main__": 

    queue = mp.Queue() 

    p = mp.Process(target=supplier, args=(queue,)) 

    p.start() 

    sleep(120) 

    returned = [] 
    while True : 
     try : 
      returned.append(queue.get(block=False)) 
     except Empty : 
      break 

    print len(returned) 
    print len(returned) == total_length 

    p.terminate() 
    sys.exit() 

Ta próbka, po uruchomieniu na oknach, zwykle usuwa tylko około 160 000 pozycji z kolejki, ponieważ główny wątek może opróżnić bufor szybciej, niż jest on uzupełniany przez dostawcę, i ostatecznie próbuje wyciągnąć z kolejki, gdy bufor jest pusty i informuje, że jest pusty.

Teoretycznie można poprawić ten problem dzięki większemu rozmiarowi bufora. Dwie linie u góry będą, jak sądzę, w systemie Windows, zwiększą domyślny rozmiar bufora dla potoku.

Jeśli je skomentujesz, ten skrypt pobierze więcej danych, zanim zostanie zakończony, ponieważ ma znacznie więcej. Moje główne pytania to: 1) Czy to rzeczywiście działa. 2) Czy istnieje sposób, aby ten kod użył tego samego rozmiaru bufora bazowego w systemie Windows i Linuksie 3) Czy są jakieś nieoczekiwane efekty uboczne przy ustawianiu dużych rozmiarów buforów dla rur.

Jestem świadomy, że generalnie nie ma sposobu, aby dowiedzieć się, czy wyciągnął wszystkie dane z kolejki (- biorąc pod uwagę, że dostawca działa stale i produkuje dane bardzo nierównomiernie), ale szukam sposobów, aby popraw to na zasadzie najlepszego wysiłku.

+1

Myślę, że mały przykład demonstrujący ten problem byłby przydatny. – pradyunsg

Odpowiedz

6

Aktualizacja:

przydatna ogniwem systemu Windows rur dla osób, które potrzebują go w przyszłości (link jest przez OP, phil_20686): https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx

Origianl:

BUFSIZE działa tylko wtedy, gdy platforma jest win32.

multiprocessing.Queue jest zbudowany na górze rury, jeśli zmienisz BUFSIZE, kolejka, którą wygenerujesz, użyje zaktualizowanej wartości. patrz poniżej:

class Queue(object): 

    def __init__(self, maxsize=0): 
     if maxsize <= 0: 
      maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX 
     self._maxsize = maxsize 
     self._reader, self._writer = Pipe(duplex=False) 

Gdy platforma jest win32, kod rur wywoła następujący kod:

def Pipe(duplex=True): 
    ''' 
    Returns pair of connection objects at either end of a pipe 
    ''' 
    address = arbitrary_address('AF_PIPE') 
    if duplex: 
     openmode = win32.PIPE_ACCESS_DUPLEX 
     access = win32.GENERIC_READ | win32.GENERIC_WRITE 
     obsize, ibsize = BUFSIZE, BUFSIZE 
    else: 
     openmode = win32.PIPE_ACCESS_INBOUND 
     access = win32.GENERIC_WRITE 
     obsize, ibsize = 0, BUFSIZE 

    h1 = win32.CreateNamedPipe(
     address, openmode, 
     win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | 
     win32.PIPE_WAIT, 
     1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL 
     ) 

Widać, że kiedy duplex jest False, outbuffer rozmiar wynosi 0 i inbuffer rozmiar jest bufsize.

inbuffer to liczba bajtów zarezerwowanych na bufor wejściowy. 2 ** 16 = 65536, to maksymalna ilość bajtów może być zapisana w jednej operacji bez blokowania, ale pojemność bufora jest różna w różnych systemach, zmienia się nawet w tym samym systemie, dlatego trudno powiedzieć stronie efekt, gdy ustawisz rurę na maksymalną wartość.

+0

Istnieje kilka ważnych/dobrych informacji o zachowaniu okien Windows w sekcji uwag: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx Jeśli dodasz link/podsumowanie, zaakceptuję to jako odpowiedź. –

+0

@ phil_20686 Zaktualizowałem swoją odpowiedź i dodałem tam podany link. – haifzhan

+0

Zaakceptowałem to jako odpowiedź, ale etykieta przepełnienia stosu FYI to zawsze podsumowanie treści linków, aby uniknąć zgniatania linków, jeśli strona porusza się itp. W ciągu kilku lat. Dzięki –

Powiązane problemy