Myślę, że najlepszym rozwiązaniem jest użycie dwóch basenów tutaj:
from multiprocessing import Pool
# import parsers here
parsers = {
'parser1': parser1.process,
'parser2': parser2.process,
'parser3': parser3.process,
'parser4': parser4.process,
'parser5': parser5.process,
'parser6': parser6.process,
'parser7': parser7.process,
}
# Sets that define which items can use high parallelism,
# and which must use low
high_par = {"parser1", "parser3", "parser4", "parser6", "parser7"}
low_par = {"parser2", "parser5"}
def process_items(key, value):
parsers[key](value)
def run_pool(func, items, num_items, check_set):
pool = Pool(num_items)
out = pool.map(func, (item for item in items if item[0] in check_set))
pool.close()
pool.join()
return out
if __name__ == "__main__":
items = [('parser2', x), ...] # Your list of tuples
# Process with high parallelism
high_results = run_pool(process_items, items, 4, high_par)
# Process with low parallelism
low_results = run_pool(process_items, items, 2, low_par)
Próbując to zrobić w jednym Pool
jest możliwe, dzięki sprytnemu użyciu prymitywów synchronizacji, ale nie sądzę, że skończy się wyglądać o wiele czystsze niż to. Może się również skończyć mniej wydajnie, ponieważ czasami twoja pula będzie musiała czekać na zakończenie pracy, więc może przetworzyć element o małej równoległości, nawet jeśli elementy o wysokim stopniu równoległości są dostępne za nim w kolejce.
To się komplikuje się nieco, jeśli potrzebne, aby uzyskać wyniki z każdego process_items
rozmowy w tej samej kolejności, w jakiej spadł w oryginalnym iterable, co oznacza, że wyniki z każdej Pool
trzeba się połączyły, ale opiera się na swoim przykładzie I nie sądzę, że to wymóg. Daj mi znać, jeśli tak, a ja postaram się odpowiednio dostosować swoją odpowiedź.
Czy możesz użyć dwóch basenów? Najpierw utwórz pulę, która wykorzystuje wszystkie twoje rdzenie, wykonaj iterację na liście, odfiltruj wpisy wymagające zredukowanego paralelizmu i wywołaj "pool1.map" na pozostałych elementach. Następnie zamknij/przyłącz się do tej puli. Następnie utwórz nową pulę z mniejszą ilością procesów i wywołaj 'map' tylko dla wpisów w iteracji, które * do * wymagają zredukowanej równoległości. – dano
To była jedyna opcja, o której mogłem pomyśleć, miałem nadzieję, że może być czystszy sposób? A może to jest wystarczająco czyste? –
Inne opcje, które mogę wymyślić, są prawdopodobnie * mniej * czyste - potrzebowałbyś jakiejś synchronizacji pomiędzy wszystkimi swoimi pracownikami, aby poradzić sobie z nią za pomocą tylko jednej "puli". Musisz też poradzić sobie z przypadkami, w których niektórzy pracownicy pracują, kiedy dostaniesz się do przedmiotów, które wymagają zredukowanej równoległości, co oznacza, że musisz poczekać, aż inni pracownicy zostaną skończeni, zanim go przetworzymy. Wygląda na to, że byłoby źle, gdyby to się stało. – dano