2012-04-12 12 views
31

Próbuję użyć wieloprocesowego obiektu Pool. Chciałbym, aby każdy proces otwierał połączenie z bazą danych po uruchomieniu, a następnie używał tego połączenia do przetwarzania danych, które są przekazywane. (Zamiast otwierania i zamykania połączenia dla każdego bitu danych.) Wygląda na to, że inicjator jest dla, ale nie mogę owijać głowy, jak komunikować się pracownik i inicjator. Więc mam coś takiego:jak użyć inicjalizatora do skonfigurowania puli wieloprocesowej?

def get_cursor(): 
    return psycopg2.connect(...).cursor() 

def process_data(data): 
    # here I'd like to have the cursor so that I can do things with the data 

if __name__ == "__main__": 
    pool = Pool(initializer=get_cursor, initargs=()) 
    pool.map(process_data, get_some_data_iterator()) 

jak mogę (lub mam) dostać kursor z powrotem get_cursor() do process_data()?

Odpowiedz

63

Funkcja initialize nazywa się tak:

def worker(...): 
    ... 
    if initializer is not None: 
     initializer(*args) 

nie ma więc wartość zwracana zapisane w dowolnym miejscu. Możesz myśleć, że to cię wykańcza, ale nie! Każdy pracownik jest w oddzielnym procesie. W ten sposób można użyć zwykłej zmiennej global.

To nie jest dokładnie ładna, ale to działa:

cursor = None 
def set_global_cursor(...): 
    global cursor 
    cursor = ... 

Teraz możesz po prostu użyć cursor w funkcji process_data. Zmienna cursor w każdym oddzielnym procesie jest oddzielona od wszystkich innych procesów, więc nie występują one względem siebie.

(nie mam pojęcia, czy psycopg2 ma inny sposób na radzenie sobie z tym, że nie wiąże się z użyciem multiprocessing w pierwszej kolejności;. Ten służy jako ogólna odpowiedź na ogólny problem z modułem multiprocessing)

+8

to powinna być zaakceptowana odpowiedź. – thias

+0

@torek Czy powinienem wywołać set_global_cursor w init_worker? –

+0

@TheUnfunCat: nie wiedząc, co to jest 'init_worker' (widzę jedną w twojej odpowiedzi, ale nie ma jej w oryginalnym pytaniu) Nie mogę powiedzieć na pewno. Ogólną ideą jest zezwolenie na "wieloprocesowość".Pool', aby utworzyć pulę procesów i aby każdy z tych procesów utworzył (własną prywatną kopię) połączenia z bazą danych. Jeśli chcesz, aby tak się stało, gdy proces puli zostanie uruchomiony, użyjesz funkcji inicjalizatora. Jeśli chcesz, aby stało się to później, możesz zrobić to później. Tak czy inaczej potrzebujesz stałej zmiennej, jak w przypadku 'function.cursor' w twojej metodzie lub zwykłego' global'. – torek

4

Możesz także wysłać funkcję do inicjalizatora i utworzyć w nim połączenie. Następnie dodajesz kursor do funkcji.

def init_worker(function): 
    function.cursor = db.conn() 

Teraz możesz uzyskać dostęp do db poprzez function.cursor bez użycia globals.

+1

Czy twoja komenda procesu brzmi: p = Pula (initializer = init_worker, args = (func)); p.map (func, args_set); ?? –

+0

Tak, coś w tym stylu (pamiętam, że to działa, ale od jakiegoś czasu nie pracowałem nad związanymi z nim materiałami, więc nie pamiętam dokładnych szczegółów, nie wahaj się dv lub zmień mojej odpowiedzi,) –

7

Torek wyjaśnił już, dlaczego inicjator nie działa w tym przypadku. Jednak nie jestem fanem osobowości Zmienna globalna, więc chciałbym wkleić tutaj inne rozwiązanie.

Ideą jest użycie klasy do zawinięcia funkcji i zainicjowania klasy za pomocą zmiennej "globalnej".

class Processor(object): 
    """Process the data and save it to database.""" 

    def __init__(self, credentials): 
    """Initialize the class with 'global' variables""" 
    self.cursor = psycopg2.connect(credentials).cursor() 

    def __call__(self, data): 
    """Do something with the cursor and data""" 
    self.cursor.find(data.key) 

a następnie zadzwonić z

p = Pool(5) 
p.map(Processor(credentials), list_of_data) 

Więc pierwszy parametr zainicjowany klasę z poświadczeń, zwróci instancję klasy i mapę wywołać wystąpienie z danymi.

Chociaż nie jest to tak proste jak rozwiązanie zmiennej globalnej, zdecydowanie sugeruję unikanie zmiennej globalnej i hermetyzowanie zmiennych w pewien bezpieczny sposób. (Naprawdę chciałbym, żeby mogli pewnego dnia obsługiwać wyrażenie lambda, to znacznie ułatwi to ...)

+0

Podoba mi się ta odpowiedź, ponieważ jest ładna, ale czy nie połączy się ponownie dla każdego elementu na liście? – woot

+6

To * jest * generalnie fajnie unikać globali i możesz zrobić coś takiego, ale będziesz chciał odłożyć inicjowanie 'self.cursor', dopóki' p.map' nie odwróci instancji procesu. Oznacza to, że twoje '__init__' ustawiłoby to na' None', a '__call__' powie" jeśli self.cursor jest None: self.cursor = ... ". W końcu, naprawdę potrzebujemy pojedynczego procesu. – torek

+0

Możesz również spróbować użyć mapy z identyfikatorami wątków/procesowymi jako kluczami, więc całkowicie izolujesz połączenia na wątek/proces. –

Powiązane problemy