2015-04-21 15 views
6

Mam skrypt Pythona, który uruchamia metodę równolegle.Jak zmienić liczbę równoległych procesów?

parsers = { 
    'parser1': parser1.process, 
    'parser2': parser2.process 
} 

def process((key, value)): 
    parsers[key](value) 

pool = Pool(4) 
pool.map(process_items, items) 

process_items jest moja metoda i items jest listą krotek z dwóch elementów do każdej krotki. Lista items zawiera około 100 tysięcy elementów.

process_items następnie wywoła metodę w zależności od podanych parametrów. Moim problemem może być 70% listy, którą mogę uruchomić z dużym paralelizmem, ale pozostałe 30% może działać tylko z 1/2 wątków, inaczej spowoduje awarię poza moją kontrolą.

W moim kodzie mam około 10 różnych procesów parsera. Dla przykładu 1-8 chcę uruchomić z Pool (4), ale 9-10 Pool (2).

Jaki jest najlepszy sposób na zoptymalizowanie tego?

+0

Czy możesz użyć dwóch basenów? Najpierw utwórz pulę, która wykorzystuje wszystkie twoje rdzenie, wykonaj iterację na liście, odfiltruj wpisy wymagające zredukowanego paralelizmu i wywołaj "pool1.map" na pozostałych elementach. Następnie zamknij/przyłącz się do tej puli. Następnie utwórz nową pulę z mniejszą ilością procesów i wywołaj 'map' tylko dla wpisów w iteracji, które * do * wymagają zredukowanej równoległości. – dano

+0

To była jedyna opcja, o której mogłem pomyśleć, miałem nadzieję, że może być czystszy sposób? A może to jest wystarczająco czyste? –

+0

Inne opcje, które mogę wymyślić, są prawdopodobnie * mniej * czyste - potrzebowałbyś jakiejś synchronizacji pomiędzy wszystkimi swoimi pracownikami, aby poradzić sobie z nią za pomocą tylko jednej "puli". Musisz też poradzić sobie z przypadkami, w których niektórzy pracownicy pracują, kiedy dostaniesz się do przedmiotów, które wymagają zredukowanej równoległości, co oznacza, że ​​musisz poczekać, aż inni pracownicy zostaną skończeni, zanim go przetworzymy. Wygląda na to, że byłoby źle, gdyby to się stało. – dano

Odpowiedz

2

Myślę, że najlepszym rozwiązaniem jest użycie dwóch basenów tutaj:

from multiprocessing import Pool 
# import parsers here 

parsers = { 
    'parser1': parser1.process, 
    'parser2': parser2.process, 
    'parser3': parser3.process, 
    'parser4': parser4.process, 
    'parser5': parser5.process, 
    'parser6': parser6.process, 
    'parser7': parser7.process, 
} 

# Sets that define which items can use high parallelism, 
# and which must use low 
high_par = {"parser1", "parser3", "parser4", "parser6", "parser7"} 
low_par = {"parser2", "parser5"} 

def process_items(key, value): 
    parsers[key](value) 

def run_pool(func, items, num_items, check_set): 
    pool = Pool(num_items) 
    out = pool.map(func, (item for item in items if item[0] in check_set)) 
    pool.close() 
    pool.join() 
    return out 

if __name__ == "__main__": 
    items = [('parser2', x), ...] # Your list of tuples 
    # Process with high parallelism 
    high_results = run_pool(process_items, items, 4, high_par) 
    # Process with low parallelism 
    low_results = run_pool(process_items, items, 2, low_par) 

Próbując to zrobić w jednym Pool jest możliwe, dzięki sprytnemu użyciu prymitywów synchronizacji, ale nie sądzę, że skończy się wyglądać o wiele czystsze niż to. Może się również skończyć mniej wydajnie, ponieważ czasami twoja pula będzie musiała czekać na zakończenie pracy, więc może przetworzyć element o małej równoległości, nawet jeśli elementy o wysokim stopniu równoległości są dostępne za nim w kolejce.

To się komplikuje się nieco, jeśli potrzebne, aby uzyskać wyniki z każdego process_items rozmowy w tej samej kolejności, w jakiej spadł w oryginalnym iterable, co oznacza, że ​​wyniki z każdej Pool trzeba się połączyły, ale opiera się na swoim przykładzie I nie sądzę, że to wymóg. Daj mi znać, jeśli tak, a ja postaram się odpowiednio dostosować swoją odpowiedź.

+0

Dzięki chory, daj to. Jest to prawdopodobnie najlepszy sposób, ponieważ chcę zachować prostotę. –

1

Można określić liczbę równoległych wątków w konstruktorze dla multiprocessing.Pool:

from multiprocessing import Pool 

def f(x): 
    return x*x 

if __name__ == '__main__': 
    pool = Pool(5) # 5 is the number of parallel threads 
    print pool.map(f, [1, 2, 3]) 
+0

Przepraszam, zaktualizowałem moje pytanie, ponieważ nie wyjaśniłem się poprawnie. Już ustawiłem procesy, których tylko jeden/dwa z moich procesów nie może działać w tym paralelnie i zawiedzie. –

Powiązane problemy