2012-08-16 22 views
18

Chciałbym jednocześnie uruchamiać wiele wystąpień programu program.py, jednocześnie ograniczając liczbę uruchomionych instancji jednocześnie (np. Do liczby rdzeni procesora dostępnych w moim systemie). Na przykład, jeśli mam 10 rdzeni i muszę wykonać 1000 uruchomień programu program.py, tylko 10 wystąpień zostanie utworzonych i uruchomionych w danym momencie.Wieloprocesorowość w Pythonie przy ograniczeniu liczby uruchomionych procesów

Próbowałem już używać modułu wieloprocesorowego, wielowątkowości i korzystania z kolejek, ale nie wydaje mi się, żebym mógł go łatwo wdrożyć. Największym problemem, jaki mam, jest znalezienie sposobu na ograniczenie liczby uruchomionych jednocześnie procesów. Jest to ważne, ponieważ jeśli utworzę 1000 procesów jednocześnie, stanie się to odpowiednikiem bomby widłowej. Nie potrzebuję programowo zwracać wyników z procesów (są one wysyłane na dysk), a wszystkie procesy działają niezależnie od siebie.

Czy ktoś może podać mi sugestie lub przykład, w jaki sposób mogę wdrożyć to w python, a nawet bash? Wcześniej pisałem kod, który napisałem do tej pory, używając kolejek, ale nie działa on tak, jak powinien i może być już w złym kierunku.

Wielkie dzięki.

+2

Wypróbowałeś [Python Process pools] (http://docs.python.org/library/multiprocessing.html#module-multiprocessing.pool)? – C2H5OH

+0

Najprostszym sposobem na to jest utworzenie programu "kontroler", który tworzy plik "multiprocessing.pool" i spawns wątków procesu roboczego (program.py), ponowne przydzielanie pracy jako zakończeniu instancji. – jozzas

+0

Dzięki, spróbuję tego; w mojej pierwszej próbie z jakiegoś powodu doszedłem do wniosku, że multiprocessing.pool nie jest tym, czego chciałem, ale teraz wydaje się słuszne. W tym przypadku wątki robocze po prostu odrodzą program.py (jako wątek? Z podprocesorem.Popen)? Czy mógłbyś opublikować przybliżony przykład lub implementację szablonu, którą mógłbym wykonać? – steadfast

Odpowiedz

2

Bash skrypt zamiast Pythonie, ale używam go często do prostego przetwarzania równoległego:

#!/usr/bin/env bash 
waitForNProcs() 
{ 
nprocs=$(pgrep -f $procName | wc -l) 
while [ $nprocs -gt $MAXPROCS ]; do 
    sleep $SLEEPTIME 
    nprocs=$(pgrep -f $procName | wc -l) 
done 
} 
SLEEPTIME=3 
MAXPROCS=10 
procName=myPython.py 
for file in ./data/*.txt; do 
waitForNProcs 
./$procName $file & 
done 

Albo za bardzo prostych przypadkach, innym rozwiązaniem jest xargs gdzie P określa liczbę proca

find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB 
3

Powinieneś używać nadzorcy procesu. Jednym z podejść byłoby użycie API dostarczonego przez Circus, aby zrobić to "programowo", witryna z dokumentacją jest teraz w trybie offline, ale myślę, że to tylko tymczasowy problem, w każdym razie, możesz użyć Cyrku, aby sobie z tym poradzić. Innym podejściem byłoby użycie supervisord i ustawienie parametru numprocs procesu na liczbę posiadanych rdzeni.

Przykładem korzystania Circus:

from circus import get_arbiter 

arbiter = get_arbiter("myprogram", numprocesses=3) 
try: 
    arbiter.start() 
finally: 
    arbiter.stop() 
21

wiem wspomniałeś, że podejście Pool.map nie ma sensu do Ciebie. Mapa jest po prostu łatwym sposobem, aby nadać jej źródło pracy i możliwość wywoływania każdego z przedmiotów. func dla mapy może być dowolnym punktem wejścia do rzeczywistej pracy na danym arg.

Jeśli to nie wydaje się słuszne dla ciebie, mam dość szczegółową odpowiedź tutaj o użyciu wzorca Producent konsumentów: https://stackoverflow.com/a/11196615/496445

Zasadniczo utworzyć kolejkę, a zacząć N Liczba pracowników. Następnie możesz podać kolejkę z głównego wątku lub utworzyć proces Producer, który przekazuje kolejkę. Pracownicy po prostu kontynuują pracę z kolejki i nigdy nie będzie więcej równoległej pracy niż liczba uruchomionych procesów.

Istnieje również opcja ograniczenia limitu w kolejce, aby zablokować producenta, gdy jest już zbyt dużo zaległej pracy, jeśli trzeba wprowadzić ograniczenia również na szybkość i zasoby, które zużywa producent.

Funkcja robocza, która zostanie wywołana, może wykonać dowolną czynność. Może to być wrapper do niektórych poleceń systemu lub może zaimportować twoją bibliotekę python i uruchomić główną procedurę. Istnieją specyficzne systemy zarządzania procesami, które pozwalają skonfigurować konfiguracje do uruchamiania dowolnych plików wykonywalnych przy ograniczonych zasobach, ale jest to tylko podstawowe podejście do wykonywania tego zadania.

Fragmenty z tego other answer kopalni:

Podstawowe Basen:

from multiprocessing import Pool 

def do_work(val): 
    # could instantiate some other library class, 
    # call out to the file system, 
    # or do something simple right here. 
    return "FOO: %s" % val 

pool = Pool(4) 
work = get_work_args() 
results = pool.map(do_work, work) 

Używanie menedżera procesów i producent

from multiprocessing import Process, Manager 
import time 
import itertools 

def do_work(in_queue, out_list): 
    while True: 
     item = in_queue.get() 

     # exit signal 
     if item == None: 
      return 

     # fake work 
     time.sleep(.5) 
     result = item 

     out_list.append(result) 


if __name__ == "__main__": 
    num_workers = 4 

    manager = Manager() 
    results = manager.list() 
    work = manager.Queue(num_workers) 

    # start for workers  
    pool = [] 
    for i in xrange(num_workers): 
     p = Process(target=do_work, args=(work, results)) 
     p.start() 
     pool.append(p) 

    # produce data 
    # this could also be started in a producer process 
    # instead of blocking 
    iters = itertools.chain(get_work_args(), (None,)*num_workers) 
    for item in iters: 
     work.put(item) 

    for p in pool: 
     p.join() 

    print results 
+0

Bardzo dobry przykład, poprawiłem go, zdobywając liczbę CPUS, tak jak wyjaśniają w http://stackoverflow.com/questions/6905264/python-multiprocessing-utilizes-only-one-core i dzięki temu mogłem dinamically ustawić num_workers na podstawie procesory komputera. –

0

Chociaż istnieje wiele odpowiedzi na temat korzystania wieloprocesorowe .pool, nie ma wielu fragmentów kodu na h dzięki użyciu trybu wieloprocesowego. Proces, który jest rzeczywiście bardziej korzystny, gdy liczy się wykorzystanie pamięci. uruchomienie 1000 procesów spowoduje przeciążenie procesora i zabicie pamięci. Jeśli każdy proces i jego potoki danych są obciążone dużą ilością pamięci, system operacyjny lub sam Python ograniczy liczbę równoległych procesów. Opracowałem poniższy kod, aby ograniczyć jednoczesną liczbę zadań wysyłanych do procesora w partiach. Wielkość partii można skalować proporcjonalnie do liczby rdzeni procesora. Na moim komputerze z systemem Windows liczba zadań na partię może być wydajna nawet 4 razy w porównaniu z dostępnymi procesorami.

import multiprocessing 
def func_to_be_multiprocessed(q,data): 
    q.put(('s')) 
q = multiprocessing.Queue() 
worker = [] 
for p in range(number_of_jobs): 
    worker[p].append(multiprocessing.Process(target=func_to_be_multiprocessed, \ 
     args=(q,data)...)) 
num_cores = multiprocessing.cpu_count() 
Scaling_factor_batch_jobs = 3.0 
num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs 
num_of_batches = number_of_jobs // num_jobs_per_batch 
for i_batch in range(num_of_batches): 
    floor_job = i_batch * num_jobs_per_batch 
    ceil_job = floor_job + num_jobs_per_batch 
    for p in worker[floor_job : ceil_job]: 
             worker.start() 
    for p in worker[floor_job : ceil_job]: 
             worker.join() 
for p in worker[ceil_job :]: 
          worker.start() 
for p in worker[ceil_job :]: 
          worker.join() 
for p in multiprocessing.active_children(): 
          p.terminate() 
result = [] 
for p in worker: 
    result.append(q.get()) 

Jedynym problemem jest to, czy któryś z pracy w każdej partii nie udało się wykonać i prowadzi do sytuacji, wiszącej, reszta partii pracy nie zostaną zainicjowane. Tak więc funkcja do przetworzenia musi mieć odpowiednie procedury obsługi błędów.

Powiązane problemy