6

Niedawno rozpocząłem naukę 0MQ. Wcześniej dzisiaj natknąłem się na blog, Python Multiprocessing with ZeroMQ. Mówił o the ventilator pattern w przewodniku 0MQ, o którym czytam, więc postanowiłem spróbować.Dlaczego ten skrypt Python 0MQ dla komputerów rozproszonych zawiesza się przy ustalonym rozmiarze wejściowym?

Zamiast po prostu obliczać produkty liczbowe według pracowników, tak jak robi to pierwotny kod, zdecydowałem się spróbować, aby respirator wysyłał duże tablice do pracowników za pomocą wiadomości 0mq. Poniżej znajduje się kod, którego używam do moich "eksperymentów".

Jak zaznaczono w komentarzu poniżej, za każdym razem, gdy próbowałem zwiększyć zmienną string_length na liczbę większą niż 3MB, kod zawiesił się.

Typowy symptom: powiedzmy, że ustawiamy string_length na 4MB (tj. 4194304), to być może menedżer wyników otrzymuje wynik od jednego pracownika, a następnie kod po prostu zatrzymuje się. htop pokazuje, że 2 rdzenie niewiele robią. Monitor ruchu sieciowego Etherape również nie pokazuje ruchu w interfejsie lo.

Do tej pory, po godzinach rozglądania się, nie byłem w stanie dowiedzieć się, co jest przyczyną tego, i byłbym wdzięczny za wskazówkę lub dwa, dlaczego i jakiekolwiek rozwiązanie tego problemu. Dzięki!

Używam 64-bitowego Ubuntu na notebooku Dell z procesorem Intel Core z powodu procesora, 8 GB RAM, 80 GB Intel X25MG2 SSD, Python 2.7.1+, libzmq1 2.1.10-1chl1 ~ natty1, python-pyzmq 2.1.10- 1chl1 ~ natty1

import time 
import zmq 
from multiprocessing import Process, cpu_count 

np = cpu_count() 
pool_size = np 
number_of_elements = 128 
# Odd, why once the slen is bumped to 3MB or above, the code hangs? 
string_length = 1024 * 1024 * 3 

def create_inputs(nelem, slen, pb=True): 
    ''' 
    Generates an array that contains nelem fix-sized (of slen bytes) 
    random strings and an accompanying array of hexdigests of the 
    former's elements. Both are returned in a tuple. 

    :type nelem: int 
    :param nelem: The desired number of elements in the to be generated 
        array. 
    :type slen: int 
    :param slen: The desired number of bytes of each array element. 
    :type pb: bool 
    :param pb: If True, displays a text progress bar during input array 
       generation. 
    ''' 
    from os import urandom 
    import sys 
    import hashlib 

    if pb: 
     if nelem <= 64: 
      toolbar_width = nelem 
      chunk_size = 1 
     else: 
      toolbar_width = 64 
      chunk_size = nelem // toolbar_width 
     description = '%d random strings of %d bytes. ' % (nelem, slen) 
     s = ''.join(('Generating an array of ', description, '...\n')) 
     sys.stdout.write(s) 
     # create an ASCII progress bar 
     sys.stdout.write("[%s]" % (" " * toolbar_width)) 
     sys.stdout.flush() 
     sys.stdout.write("\b" * (toolbar_width+1)) 
    array = list() 
    hash4a = list() 
    try: 
     for i in range(nelem): 
      e = urandom(int(slen)) 
      array.append(e) 
      h = hashlib.md5() 
      h.update(e) 
      he = h.hexdigest() 
      hash4a.append(he) 
      i += 1 
      if pb and i and i % chunk_size == 0: 
       sys.stdout.write("-") 
       sys.stdout.flush() 
     if pb: 
      sys.stdout.write("\n") 
    except MemoryError: 
     print('Memory Error: discarding existing arrays') 
     array = list() 
     hash4a = list() 
    finally: 
     return array, hash4a 

# The "ventilator" function generates an array of nelem fix-sized (of slen 
# bytes long) random strings, and sends the array down a zeromq "PUSH" 
# connection to be processed by listening workers, in a round robin load 
# balanced fashion. 

def ventilator(): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to send work 
    ventilator_send = context.socket(zmq.PUSH) 
    ventilator_send.bind("tcp://127.0.0.1:5557") 

    # Give everything a second to spin up and connect 
    time.sleep(1) 

    # Create the input array 
    nelem = number_of_elements 
    slen = string_length 
    payloads = create_inputs(nelem, slen) 

    # Send an array to each worker 
    for num in range(np): 
     work_message = { 'num' : payloads } 
     ventilator_send.send_pyobj(work_message) 

    time.sleep(1) 

# The "worker" functions listen on a zeromq PULL connection for "work" 
# (array to be processed) from the ventilator, get the length of the array 
# and send the results down another zeromq PUSH connection to the results 
# manager. 

def worker(wrk_num): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to receive work from the ventilator 
    work_receiver = context.socket(zmq.PULL) 
    work_receiver.connect("tcp://127.0.0.1:5557") 

    # Set up a channel to send result of work to the results reporter 
    results_sender = context.socket(zmq.PUSH) 
    results_sender.connect("tcp://127.0.0.1:5558") 

    # Set up a channel to receive control messages over 
    control_receiver = context.socket(zmq.SUB) 
    control_receiver.connect("tcp://127.0.0.1:5559") 
    control_receiver.setsockopt(zmq.SUBSCRIBE, "") 

    # Set up a poller to multiplex the work receiver and control receiver channels 
    poller = zmq.Poller() 
    poller.register(work_receiver, zmq.POLLIN) 
    poller.register(control_receiver, zmq.POLLIN) 

    # Loop and accept messages from both channels, acting accordingly 
    while True: 
     socks = dict(poller.poll()) 

     # If the message came from work_receiver channel, get the length 
     # of the array and send the answer to the results reporter 
     if socks.get(work_receiver) == zmq.POLLIN: 
      #work_message = work_receiver.recv_json() 
      work_message = work_receiver.recv_pyobj() 
      length = len(work_message['num'][0]) 
      answer_message = { 'worker' : wrk_num, 'result' : length } 
      results_sender.send_json(answer_message) 

     # If the message came over the control channel, shut down the worker. 
     if socks.get(control_receiver) == zmq.POLLIN: 
      control_message = control_receiver.recv() 
      if control_message == "FINISHED": 
       print("Worker %i received FINSHED, quitting!" % wrk_num) 
       break 

# The "results_manager" function receives each result from multiple workers, 
# and prints those results. When all results have been received, it signals 
# the worker processes to shut down. 

def result_manager(): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to receive results 
    results_receiver = context.socket(zmq.PULL) 
    results_receiver.bind("tcp://127.0.0.1:5558") 

    # Set up a channel to send control commands 
    control_sender = context.socket(zmq.PUB) 
    control_sender.bind("tcp://127.0.0.1:5559") 

    for task_nbr in range(np): 
     result_message = results_receiver.recv_json() 
     print "Worker %i answered: %i" % (result_message['worker'], result_message['result']) 

    # Signal to all workers that we are finsihed 
    control_sender.send("FINISHED") 
    time.sleep(5) 

if __name__ == "__main__": 

    # Create a pool of workers to distribute work to 
    for wrk_num in range(pool_size): 
     Process(target=worker, args=(wrk_num,)).start() 

    # Fire up our result manager... 
    result_manager = Process(target=result_manager, args=()) 
    result_manager.start() 

    # Start the ventilator! 
    ventilator = Process(target=ventilator, args=()) 
    ventilator.start() 
+0

że nie więcej doświadczeniach obniżyła number_of_elements do 64 oraz zwiększała string_length do 6. Kod jeszcze ran porządku. Powyżej tego pojawił się ten sam objaw. To doprowadziło mnie do przekonania, że ​​może istnieć ogólny limit rozmiaru wiadomości gdzieś w powiązaniu pyzmq. API 0MQ C ma tę funkcję [link] (http://api.zeromq.org/2-1:zmq-msg-init-size) zmq_msg_init_size (3), której nie mogę znaleźć w dokumentacji pyzmq. Czy to mogła być przyczyna? – user183394

+0

Czy dostaniesz tropienie w miejscu, w którym się znajduje? To może dać ci wskazówkę. –

+0

Próbowałem twojego kodu na moim macowym laptopie z string_length = 1024 * 1024 * 4 i działało dobrze, więc domyślam się, że to musi mieć coś wspólnego z jakimś sporem pamięci. –

Odpowiedz

6

Problem polega na tym, że gniazdo respiratora (PUSH) zamyka się przed zakończeniem wysyłania. Na koniec funkcji respiratora masz sen 1s, który nie wystarcza do wysłania 384 MB wiadomości. Dlatego masz próg, który masz, jeśli sen był krótszy, próg byłby niższy.

Powiedział, że LINGER jest domniemany, aby zapobiec tego rodzaju rzeczy, więc chciałbym to przynieść z zeromq: PUSH nie wydaje się szanować LINGER.

Rozwiązaniem dla konkretnego przykładu (bez dodawania niewykonanego długiego snu) byłoby użycie tego samego sygnału FINISH, aby zakończyć pracę respiratora jako pracowników. W ten sposób zagwarantujesz, że respirator przetrwa tak długo, jak musi.

poprawiona wentylator:

def ventilator(): 
    # Initialize a zeromq context 
    context = zmq.Context() 

    # Set up a channel to send work 
    ventilator_send = context.socket(zmq.PUSH) 
    ventilator_send.bind("tcp://127.0.0.1:5557") 

    # Set up a channel to receive control messages 
    control_receiver = context.socket(zmq.SUB) 
    control_receiver.connect("tcp://127.0.0.1:5559") 
    control_receiver.setsockopt(zmq.SUBSCRIBE, "") 

    # Give everything a second to spin up and connect 
    time.sleep(1) 

    # Create the input array 
    nelem = number_of_elements 
    slen = string_length 
    payloads = create_inputs(nelem, slen) 

    # Send an array to each worker 
    for num in range(np): 
     work_message = { 'num' : payloads } 
     ventilator_send.send_pyobj(work_message) 

    # Poll for FINISH message, so we don't shutdown too early 
    poller = zmq.Poller() 
    poller.register(control_receiver, zmq.POLLIN) 

    while True: 
     socks = dict(poller.poll()) 

     if socks.get(control_receiver) == zmq.POLLIN: 
      control_message = control_receiver.recv() 
      if control_message == "FINISHED": 
       print("Ventilator received FINSHED, quitting!") 
       break 
      # else: unhandled message 
+0

minrk, wielu dzięki za wnikliwą odpowiedź. Bardzo pomocne! Nie podejrzewałem wartości ZMQ_LINGER ustawionej przez zmq_setsockopt (3), ponieważ, jak powiedziałeś, wartość domyślna to -1 (nieskończona). Ładny chwyt! Na pewno podniosę kwestię najpierw z pyzmq i wspomnę o niej również na liście mailingowej zeromq. Przetestowałem twoją poprawkę aż do długości łańcuchowej ustawionej na 1024 * 1024 * 10, wydłużyłem fizyczną pamięć RAM mojego notebooka i wciąż otrzymałem oczekiwany wynik. Dzięki jeszcze raz! – user183394

+3

Może nie warto go przywoływać "pyzmq people", ponieważ to w zasadzie teraz. Pingowałem o tym libzmq i napisałem prostszy przypadek testowy w C: https://gist.github.com/1643223 – minrk

Powiązane problemy