2012-03-24 30 views
5

Mam funkcję pobierania, która pobiera wiele plików równolegle. Używam multiprocessing.Pool.map_async w celu pobrania różnych fragmentów tego samego pliku. Chciałbym pokazać pasek statusu pobierania. W tym celu muszę znać całkowite bajty, które zostały już pobrane (total_bytes_dl).Udostępnianie zmiennej między procesami

pool = multiprocessing.Pool(processes) 
    mapObj = pool.map_async(f, args) 

    while not mapObj.ready(): 
     status = r"%.2f MB/%.2f MB" % (total_bytes_dl/1024.0/1024.0, filesize/1024.0/1024.0,) 
     status = status + chr(8)*(len(status)+1) 
     print status, 
     time.sleep(0.5) 

Czy istnieje sposób, aby ustawić zmienną, która będzie wspólny wśród wszystkich tych procesów i głównego procesu, dlatego każdy proces może dołączyć ilość bajtów, które właśnie ściągnięty?

Odpowiedz

3

Rozwiązaniem było intilize nowy proces i przekazać wspólną wartość ctypes:

from ctypes import c_int 
import dummy 

shared_bytes_var = multiprocessing.Value(c_int) 

def Func(...): 
    .... 
    pool = multiprocessing.Pool(initializer=_initProcess,initargs=(shared_bytes_var,)) 
    .... 

def _initProcess(x): 
    dummy.shared_bytes_var = x 
1

Oczywiście, możesz używać wspólnych wartości ctypes w pamięci współdzielonej, jeśli chcesz tylko pobrać bajty, powinno to zrobić. przekazać odpowiednią wartość każdemu pracownikowi, a proces wywołujący będzie miał do niego dostęp.

patrz: http://docs.python.org/library/multiprocessing.html#shared-ctypes-objects

+2

Nie można odwzorować ctypu udostępnionego obiektu: 'RuntimeError: Zsynchronizowane obiekty powinny być udostępniane między procesami poprzez dziedziczenie' – iTayb

0

Można użyć wieloprocesowej obiektu kolejki, żeby robotnicy mogli użyć do wysyłania danych o stanie dalej. Twój główny proces będzie musiał odczytać wpisy statusu z kolejki i odpowiednio zaktualizować status.

1

używać obiektu Queue przydzieloną tak:

que = multiprocessing.Manager().Queue() 

Przełóż tej zmiennej dla pracowników, a oni można używać od que.put(bytes) do okresowego raportowania, ile pobrano od czasu ostatniego raportu. Ci potem po prostu sprawdzić rozmiar kolejki i pociągnąć w dowolnych raportów przychodzących:

downloaded = 0 
while not mapObj.ready(): 
    for _ in range(q.qsize()): 
     downloaded += q.get() 
    print downloaded, r"bytes downloaded\r", 
    time.sleep(0.5) 

Uwaga: Chociaż Moduł zapewnia również sposób multiprocessing.Queue(), nie jest w pełni równoważne multiprocessing.Manager().Queue(). Zobacz this question i odpowiedź.

Powiązane problemy