2012-06-15 9 views
19

Próbuję zwrócić wartości z podprocesów, ale te wartości są niestety nieodkryte. Więc użyłem zmiennych globalnych w module wątków z powodzeniem, ale nie mogłem pobrać aktualizacji wykonanych w podprocesach przy użyciu modułu wieloprocesorowego. Mam nadzieję, że czegoś brakuje.Aktualizacje globalnej aktualizacji wieloprocesorowej Python nie powróciły do ​​rodzica

Wyniki wydrukowane na końcu są zawsze takie same jak wartości początkowe, biorąc pod uwagę dane vale dataDV03 i dataDV04. Podprocesy aktualizują te zmienne globalne, ale te zmienne globalne pozostają niezmienione w obiekcie nadrzędnym.

import multiprocessing 

# NOT ABLE to get python to return values in passed variables. 

ants = ['DV03', 'DV04'] 
dataDV03 = ['', ''] 
dataDV04 = {'driver': '', 'status': ''} 


def getDV03CclDrivers(lib): # call global variable 
    global dataDV03 
    dataDV03[1] = 1 
    dataDV03[0] = 0 
# eval('CCL.' + lib + '.' + lib + '("DV03")') these are unpicklable instantiations 

def getDV04CclDrivers(lib, dataDV04): # pass global variable 
    dataDV04['driver'] = 0 # eval('CCL.' + lib + '.' + lib + '("DV04")') 


if __name__ == "__main__": 

    jobs = [] 
    if 'DV03' in ants: 
     j = multiprocessing.Process(target=getDV03CclDrivers, args=('LORR',)) 
     jobs.append(j) 

    if 'DV04' in ants: 
     j = multiprocessing.Process(target=getDV04CclDrivers, args=('LORR', dataDV04)) 
     jobs.append(j) 

    for j in jobs: 
     j.start() 

    for j in jobs: 
     j.join() 

    print 'Results:\n' 
    print 'DV03', dataDV03 
    print 'DV04', dataDV04 

Nie mogę wysłać do mojego pytania, więc spróbuję edytować oryginał.

Oto przedmiot, który nie jest picklable:

In [1]: from CCL import LORR 
In [2]: lorr=LORR.LORR('DV20', None) 
In [3]: lorr 
Out[3]: <CCL.LORR.LORR instance at 0x94b188c> 

Jest to błąd zwracany, gdy używam multiprocessing.Pool powrócić instancję z powrotem do rodziców:

Thread getCcl (('DV20', 'LORR'),) 
Process PoolWorker-1: 
Traceback (most recent call last): 
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap 
self.run() 
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/process.py", line 88, in run 
self._target(*self._args, **self._kwargs) 
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/pool.py", line 71, in worker 
put((job, i, result)) 
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/queues.py", line 366, in put 
return send(obj) 
UnpickleableError: Cannot pickle <type 'thread.lock'> objects 



In [5]: dir(lorr) 
Out[5]: 
['GET_AMBIENT_TEMPERATURE', 
'GET_CAN_ERROR', 
'GET_CAN_ERROR_COUNT', 
'GET_CHANNEL_NUMBER', 
'GET_COUNT_PER_C_OP', 
'GET_COUNT_REMAINING_OP', 
'GET_DCM_LOCKED', 
'GET_EFC_125_MHZ', 
'GET_EFC_COMB_LINE_PLL', 
'GET_ERROR_CODE_LAST_CAN_ERROR', 
'GET_INTERNAL_SLAVE_ERROR_CODE', 
'GET_MAGNITUDE_CELSIUS_OP', 
'GET_MAJOR_REV_LEVEL', 
'GET_MINOR_REV_LEVEL', 
'GET_MODULE_CODES_CDAY', 
'GET_MODULE_CODES_CMONTH', 
'GET_MODULE_CODES_DIG1', 
'GET_MODULE_CODES_DIG2', 
'GET_MODULE_CODES_DIG4', 
'GET_MODULE_CODES_DIG6', 
'GET_MODULE_CODES_SERIAL', 
'GET_MODULE_CODES_VERSION_MAJOR', 
'GET_MODULE_CODES_VERSION_MINOR', 
'GET_MODULE_CODES_YEAR', 
'GET_NODE_ADDRESS', 
'GET_OPTICAL_POWER_OFF', 
'GET_OUTPUT_125MHZ_LOCKED', 
'GET_OUTPUT_2GHZ_LOCKED', 
'GET_PATCH_LEVEL', 
'GET_POWER_SUPPLY_12V_NOT_OK', 
'GET_POWER_SUPPLY_15V_NOT_OK', 
'GET_PROTOCOL_MAJOR_REV_LEVEL', 
'GET_PROTOCOL_MINOR_REV_LEVEL', 
'GET_PROTOCOL_PATCH_LEVEL', 
'GET_PROTOCOL_REV_LEVEL', 
'GET_PWR_125_MHZ', 
'GET_PWR_25_MHZ', 
'GET_PWR_2_GHZ', 
'GET_READ_MODULE_CODES', 
'GET_RX_OPT_PWR', 
'GET_SERIAL_NUMBER', 
'GET_SIGN_OP', 
'GET_STATUS', 
'GET_SW_REV_LEVEL', 
'GET_TE_LENGTH', 
'GET_TE_LONG_FLAG_SET', 
'GET_TE_OFFSET_COUNTER', 
'GET_TE_SHORT_FLAG_SET', 
'GET_TRANS_NUM', 
'GET_VDC_12', 
'GET_VDC_15', 
'GET_VDC_7', 
'GET_VDC_MINUS_7', 
'SET_CLEAR_FLAGS', 
'SET_FPGA_LOGIC_RESET', 
'SET_RESET_AMBSI', 
'SET_RESET_DEVICE', 
'SET_RESYNC_TE', 
'STATUS', 
'_HardwareDevice__componentName', 
'_HardwareDevice__hw', 
'_HardwareDevice__stickyFlag', 
'_LORRBase__logger', 
'__del__', 
'__doc__', 
'__init__', 
'__module__', 
'_devices', 
'clearDeviceCommunicationErrorAlarm', 
'getControlList', 
'getDeviceCommunicationErrorCounter', 
'getErrorMessage', 
'getHwState', 
'getInternalSlaveCanErrorMsg', 
'getLastCanErrorMsg', 
'getMonitorList', 
'hwConfigure', 
'hwDiagnostic', 
'hwInitialize', 
'hwOperational', 
'hwSimulation', 
'hwStart', 
'hwStop', 
'inErrorState', 
'isMonitoring', 
'isSimulated'] 

In [6]: 
+1

„wartości te są unpickleable” - masz na myśli rzeczy swoje opakowania do swojego globalne nie są w stanie być marynowane? W takim przypadku nie można użyć podprocesu (AFAIK), ponieważ w ten sposób przekazywane są informacje między procesami. Jeśli dane można przetrawić, użyjesz 'Menedżera'. – mgilson

+0

Naprawdę nie powinno się "publikować" niczego oprócz odpowiedzi na swoje pytanie. Dobrze więc, że zamiast tego zredagowałeś; to jest właściwa rzecz w tym przypadku. – senderle

+0

Inną alternatywą, którą ostatnio odkryłem, jest wywołanie zwrotne 'apply_async'. Wywołanie zwrotne zostanie wykonane w procesie nadrzędnym. Oznacza to, że wszystko, co podprocesor zwróci, może zostać przekazane do procesu oddzwaniania, a proces wywołania zwrotnego może następnie zmutować globale. Wymaga to jednak użycia deklaracji "globalnameName" u góry funkcji wywołania zwrotnego. – CMCDragonkai

Odpowiedz

4

Podczas korzystania multiprocess, jedynym sposobem przekazywania obiektów między procesami jest użycie Queue lub Pipe; globale nie są udostępniane. Obiekty muszą być dostępne, więc multiprocess nie pomoże tutaj.

+0

Procesor wieloprocesowy ma Menedżera do przesuwania danych pomiędzy procesami. – jdi

+0

Kolejki są znowu jedynym sposobem przekazania wartości do funkcji nadrzędnej lub tylko dla tych samych funkcji poziomu? . – erogol

3

@DBlas daje szybki url i odniesienie do klasy manager w odpowiedzi, ale myślę, że jej jeszcze trochę niejasne, więc pomyślałem, że może to być pomocne, aby po prostu zobaczyć to stosowane ...

import multiprocessing 
from multiprocessing import Manager 

ants = ['DV03', 'DV04'] 

def getDV03CclDrivers(lib, data_dict): 
    data_dict[1] = 1 
    data_dict[0] = 0 

def getDV04CclDrivers(lib, data_list): 
    data_list['driver'] = 0 


if __name__ == "__main__": 

    manager = Manager() 
    dataDV03 = manager.list(['', '']) 
    dataDV04 = manager.dict({'driver': '', 'status': ''}) 

    jobs = [] 
    if 'DV03' in ants: 
     j = multiprocessing.Process(
       target=getDV03CclDrivers, 
       args=('LORR', dataDV03)) 
     jobs.append(j) 

    if 'DV04' in ants: 
     j = multiprocessing.Process(
       target=getDV04CclDrivers, 
       args=('LORR', dataDV04)) 
     jobs.append(j) 

    for j in jobs: 
     j.start() 

    for j in jobs: 
     j.join() 

    print 'Results:\n' 
    print 'DV03', dataDV03 
    print 'DV04', dataDV04 

Ponieważ proces wieloprocesowy w rzeczywistości wykorzystuje oddzielne procesy, nie można po prostu udostępniać zmiennych globalnych, ponieważ będą one w zupełnie innych "przestrzeniach" w pamięci. To, co robisz globalnie w ramach jednego procesu, nie będzie odzwierciedlało się w innym. Chociaż przyznaję, że wydaje się to mylące, ponieważ tak jak to widzisz, wszyscy żyją tam w tym samym kodzie, więc "dlaczego te metody nie miałyby dostępu do globalnego"? Trudniej jest oprzeć się myśli, że będą działać w różnych procesach.

Podano, że Manager class służy jako proxy dla struktur danych, które mogą przesyłać informacje między procesami. To co zrobisz, to stworzyć specjalny dykt i listę od menedżera, przekazać je do swoich metod i działać na nich lokalnie.

Un-marynowane-stanie danych

Dla specjalizują LORR obiektu, może być konieczne, aby stworzyć coś w rodzaju proxy, który może reprezentować pickable stanu instancji.

Niezwykle wytrzymała lub przetestowana, ale daje pomysł.

class LORRProxy(object): 

    def __init__(self, lorrObject=None): 
     self.instance = lorrObject 

    def __getstate__(self): 
     # how to get the state data out of a lorr instance 
     inst = self.instance 
     state = dict(
      foo = inst.a, 
      bar = inst.b, 
     ) 
     return state 

    def __setstate__(self, state): 
     # rebuilt a lorr instance from state 
     lorr = LORR.LORR() 
     lorr.a = state['foo'] 
     lorr.b = state['bar'] 
     self.instance = lorr 
+0

Ale obiekty 'Manager' używają' pikle' do przesyłania danych. Więc jeśli dane PO są naprawdę nietrwałe, myślę, że to nie zadziała. – senderle

+1

@senderle: To może być prawdą, ale z drugiej strony OP nie podał jeszcze żadnego przykładu tego, co dane nie nadające się do dystrybucji. Mogę tylko odpowiedzieć na to, co widzę :-) – jdi

+0

Dzięki za szczegółowe odpowiedzi. Zgodnie z odpowiedziami, zakleszczenia są 1) wartościami, które muszę przekazać do nadrzędnych błędów powrotu piklowania, a podprocesy w oddzielnych przestrzeniach zapobiegają zwrotowi zmiennych globalnych. – Buoy

25

Podczas korzystania multiprocessing otworzyć drugą sposobie, całkowicie nowa instancja Pythona, z własnym stanem globalnej, jest tworzony. Ten globalny stan nie jest dzielony, więc zmiany dokonane przez procesy potomne na zmienne globalne będą niewidoczne dla procesu nadrzędnego.

Dodatkowo większość abstrakcji, które oferuje multiprocessing, zapewnia pikle do przesyłania danych. Wszystkie dane przesyłane przy użyciu serwerów proxy must be pickleable; obejmuje wszystkie obiekty, które Manager provides.Istotne cytaty (podkreślenie moje):

Ensure that the arguments to the methods of proxies are picklable.

A (w sekcji Manager):

Other processes can access the shared objects by using proxies.

Queue s wymagają również pickleable danych; docs nie mów tak, ale szybkie badanie potwierdza, że:

import multiprocessing 
import pickle 

class Thing(object): 
    def __getstate__(self): 
     print 'got pickled' 
     return self.__dict__ 
    def __setstate__(self, state): 
     print 'got unpickled' 
     self.__dict__.update(state) 

q = multiprocessing.Queue() 
p = multiprocessing.Process(target=q.put, args=(Thing(),)) 
p.start() 
print q.get() 
p.join() 

wyjściowa:

$ python mp.py 
got pickled 
got unpickled 
<__main__.Thing object at 0x10056b350> 

Jedno podejście potęga pracy dla Ciebie, jeśli naprawdę nie może marynowane danych, jest znalezienie sposobu przechowywania go jako obiektu ctype; odniesienie do pamięci może wtedy być passed to a child process. Wydaje mi się to dość podejrzane; Nigdy tego nie robiłem. Ale może to być możliwe rozwiązanie dla ciebie.

Wygląda na to, że musisz wiedzieć o wiele więcej na temat elementów wewnętrznych modelu LORR. Czy LORR jest klasą? Czy możesz z tego zreklasować? Czy jest to podklasa czegoś innego? Jakie jest jego MRO? (Wypróbuj LORR.__mro__ i opublikuj dane wyjściowe, jeśli działa.) Jeśli jest to czysty obiekt python, możliwe jest jego podklasy, tworząc __setstate__ i __getstate__, aby umożliwić trawienie.

Innym podejściem może być ustalenie, jak uzyskać odpowiednie dane z instancji LORR i przekazanie jej za pomocą prostego łańcucha. Skoro mówisz, że naprawdę chcesz tylko wywołać metody obiektu, dlaczego nie po prostu to zrobić, używając Queue s do wysyłania wiadomości tam i z powrotem? Innymi słowy, coś jak to (w uproszczeniu):

Main Process    Child 1      Child 2 
          LORR 1      LORR 2 
child1_in_queue  -> get message 'foo' 
          call 'foo' method 
child1_out_queue <- return foo data string 
child2_in_queue     ->     get message 'bar' 
                 call 'bar' method 
child2_out_queue     <-     return bar data string 
+0

@ user1459256, rozważ moją edycję (u dołu mojego posta). Potrzebujemy więcej informacji o obiektach "LORR", aby opracować możliwe podejście. – senderle

+0

@ user1459256, OK, kiedy mówisz, że musisz wywołać metody później, aby uzyskać aktualne dane po wywołaniu - to sprawia, że ​​myślę, że naprawdę nie musisz przenosić obiektów 'LORR' w ogóle, ale że raczej trzeba przenieść dane zwrócone przez metodę na obiekcie 'LORR'. Ale to powinno być łatwe! Wystarczy użyć przekazu wiadomości za pośrednictwem 'Kolejki', aby powiedzieć procesowi podrzędnemu, aby wywołać określoną metodę, a następnie, aby dziecko zwróciło wartość przez zwrot' Kolejka'. – senderle

+0

@ user1459256, zobacz moją ostatnią edycję i daj mi znać, jeśli zaproponowane rozwiązanie zadziała. – senderle

1

używam p.map() w celu wydzielenia szereg procesów na zdalnych serwerach i wydrukować wyniki, gdy pochodzą one z powrotem w nieprzewidywalnych czasach:

Servers=[...] 
from multiprocessing import Pool 
p=Pool(len(Servers)) 
p.map(DoIndividualSummary, Servers) 

To zadziałało, jeśli DoIndividualSummary użyłdla wyników, ale ogólny wynik był nieprzewidywalny, co utrudniało interpretację. Próbowałem wielu podejść do korzystania z zmiennych globalnych, ale napotkałem problemy. Wreszcie udało mi się z sqlite3.

Przed p.map() otwórz sqlite połączenie i utworzyć tabelę:

import sqlite3 
conn=sqlite3.connect('servers.db') # need conn for commit and close 
db=conn.cursor() 
try: db.execute('''drop table servers''') 
except: pass 
db.execute('''CREATE TABLE servers (server text, serverdetail text, readings  text)''') 
conn.commit() 

Następnie, po powrocie z DoIndividualSummary(), zapisanie wyników do tabeli:

db.execute('''INSERT INTO servers VALUES (?,?,?)''',   (server,serverdetail,readings)) 
conn.commit() 
return 

Po map() oświadczenie, druk wyniki:

db.execute('''select * from servers order by server''') 
rows=db.fetchall() 
for server,serverdetail,readings in rows: print serverdetail,readings 

maj wydaje się być przesadą, ale było to dla mnie prostsze niż zalecane rozwiązania.

3

Można również użyć numeru multiprocessing Array. Pozwala to na współdzielenie stanu między procesami i jest prawdopodobnie najbliższą zmiennej globalnej.

U góry głównej strony zadeklaruj tablicę. Pierwszy argument "i" mówi, że będą to liczby całkowite.Drugi argument daje początkowe wartości:

shared_dataDV03 = multiprocessing.Array ('i', (0, 0)) #a shared array 

Następnie przekazać tę tablicę do procesu jako argument:

j = multiprocessing.Process(target=getDV03CclDrivers, args=('LORR',shared_dataDV03)) 

Musisz otrzymać argument tablicy w funkcji ich nazwie, a następnie można zmodyfikować go w funkcji:

def getDV03CclDrivers(lib,arr): # call global variable 
    arr[1]=1 
    arr[0]=0 

tablica jest wspólną z rodzicem, więc można wydrukować wartości na koniec w dominującej:

print 'DV03', shared_dataDV03[:] 

i pokaże zmiany:

DV03 [0, 1]