2011-08-26 13 views
41

Najpierw szukałem informacji i nie mogłem znaleźć odpowiedzi na moje pytanie. Próbuję uruchomić wiele funkcji równolegle w Pythonie.Python: Jak uruchomić funkcje Pythona równolegle?

mam coś takiego:

files.py 

import common #common is a util class that handles all the IO stuff 

dir1 = 'C:\folder1' 
dir2 = 'C:\folder2' 
filename = 'test.txt' 
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45] 

def func1(): 
    c = common.Common() 
    for i in range(len(addFiles)): 
     c.createFiles(addFiles[i], filename, dir1) 
     c.getFiles(dir1) 
     time.sleep(10) 
     c.removeFiles(addFiles[i], dir1) 
     c.getFiles(dir1) 

def func2(): 
    c = common.Common() 
    for i in range(len(addFiles)): 
     c.createFiles(addFiles[i], filename, dir2) 
     c.getFiles(dir2) 
     time.sleep(10) 
     c.removeFiles(addFiles[i], dir2) 
     c.getFiles(dir2) 

chcę zadzwonić func1 i func2 i je uruchomić w tym samym czasie. Funkcje nie współdziałają ze sobą ani z tym samym obiektem. Teraz muszę poczekać na zakończenie func1, zanim rozpocznie się func2. Jak mogę zrobić coś jak poniżej:

process.py 

from files import func1, func2 

runBothFunc(func1(), func2()) 

Chcę, aby móc stworzyć zarówno katalogi dość blisko do tego samego czasu, ponieważ każdy min liczę ile pliki są tworzone. Jeśli katalog nie będzie tam, to zrzuci mój czas.

+1

Pytanie o aktualizację – lmcadory

+1

Możesz chcieć przebudować to; jeśli zliczasz liczbę plików/folderów co minutę, tworzysz warunki wyścigu. Co powiesz na to, aby każda funkcja aktualizowała licznik, lub używał pliku blokującego, aby upewnić się, że proces okresowy nie aktualizuje licznika, dopóki obie funkcje nie zostaną zakończone? –

Odpowiedz

73

Można użyć threading lub multiprocessing.

Z powodu peculiarities of CPython, mało prawdopodobne jest osiągnięcie rzeczywistego równoległości. Z tego powodu zazwyczaj lepszym rozwiązaniem jest multiprocessing.

Powyżej znajduje się pełna przykład:

from multiprocessing import Process 

def func1(): 
    print 'func1: starting' 
    for i in xrange(10000000): pass 
    print 'func1: finishing' 

def func2(): 
    print 'func2: starting' 
    for i in xrange(10000000): pass 
    print 'func2: finishing' 

if __name__ == '__main__': 
    p1 = Process(target=func1) 
    p1.start() 
    p2 = Process(target=func2) 
    p2.start() 
    p1.join() 
    p2.join() 

Mechanika rozruchu/łączenie procesów potomnych można łatwo zamknąć w funkcji wzdłuż linii swojej runBothFunc:

def runInParallel(*fns): 
    proc = [] 
    for fn in fns: 
    p = Process(target=fn) 
    p.start() 
    proc.append(p) 
    for p in proc: 
    p.join() 

runInParallel(func1, func2) 
+2

Użyłem twojego kodu, ale funkcje nadal nie zaczęły się w tym samym czasie. – lmcadory

+2

@Lamar McAdours: Proszę wyjaśnić, co dokładnie oznacza "w tym samym czasie", być może dając konkretny przykład tego, co zrobiłeś, czego się spodziewałeś i co się właściwie wydarzyło. – NPE

+3

@Lamar: Nigdy nie możesz mieć gwarancji "dokładnie w tym samym czasie" i myślenie, że możesz to po prostu źle. W zależności od tego, ile masz cpusu, obciążenie komputera, czas wielu zdarzeń na komputerze będzie miał wpływ na czas rozpoczęcia wątków/procesu. Ponadto, ponieważ procesy są uruchamiane zaraz po utworzeniu, koszty związane z tworzeniem procesu również muszą zostać obliczone w wyświetlonej różnicy czasu. – Martin

3

Nie ma mowy, aby zagwarantować, że dwie funkcje będą wykonywane w synchronizacji ze sobą, co wydaje się być tym, co chcesz zrobić.

Najlepsze, co można zrobić, to podzielić funkcję na kilka kroków, a następnie zaczekać, aż oba zakończą się w krytycznych punktach synchronizacji, używając Process.join, takich jak wspomniane odpowiedzi @ aix.

To jest lepsze niż time.sleep(10), ponieważ nie można zagwarantować dokładnych ustawień czasowych. Z wyraźnym oczekiwaniem mówisz, że funkcje muszą zostać wykonane, wykonując ten krok przed przejściem do następnego, zamiast zakładać, że zostanie to wykonane w ciągu 10 ms, co nie jest gwarantowane w oparciu o to, co jeszcze dzieje się na komputerze.

3

Jeśli jesteś użytkownikiem systemu Windows i używasz Pythona 3, to ten post pomoże ci w równoległym programowaniu w Pythonie. Gdy uruchomisz zwykłe programowanie puli biblioteki wieloprocesorowej, otrzymasz błąd dotyczący głównej funkcji w twoim programie . Dzieje się tak dlatego, że okna nie mają funkcji fork(). Poniższy post daje rozwiązanie wspomnianego problemu.

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Ponieważ używałem Pythonie 3, zmieniłem program trochę tak:

from types import FunctionType 
import marshal 

def _applicable(*args, **kwargs): 
    name = kwargs['__pw_name'] 
    code = marshal.loads(kwargs['__pw_code']) 
    gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls']) 
    defs = marshal.loads(kwargs['__pw_defs']) 
    clsr = marshal.loads(kwargs['__pw_clsr']) 
    fdct = marshal.loads(kwargs['__pw_fdct']) 
    func = FunctionType(code, gbls, name, defs, clsr) 
    func.fdct = fdct 
    del kwargs['__pw_name'] 
    del kwargs['__pw_code'] 
    del kwargs['__pw_defs'] 
    del kwargs['__pw_clsr'] 
    del kwargs['__pw_fdct'] 
    return func(*args, **kwargs) 

def make_applicable(f, *args, **kwargs): 
    if not isinstance(f, FunctionType): raise ValueError('argument must be a function') 
    kwargs['__pw_name'] = f.__name__ # edited 
    kwargs['__pw_code'] = marshal.dumps(f.__code__) # edited 
    kwargs['__pw_defs'] = marshal.dumps(f.__defaults__) # edited 
    kwargs['__pw_clsr'] = marshal.dumps(f.__closure__) # edited 
    kwargs['__pw_fdct'] = marshal.dumps(f.__dict__) # edited 
    return _applicable, args, kwargs 

def _mappable(x): 
    x,name,code,defs,clsr,fdct = x 
    code = marshal.loads(code) 
    gbls = globals() #gbls = marshal.loads(gbls) 
    defs = marshal.loads(defs) 
    clsr = marshal.loads(clsr) 
    fdct = marshal.loads(fdct) 
    func = FunctionType(code, gbls, name, defs, clsr) 
    func.fdct = fdct 
    return func(x) 

def make_mappable(f, iterable): 
    if not isinstance(f, FunctionType): raise ValueError('argument must be a function') 
    name = f.__name__ # edited 
    code = marshal.dumps(f.__code__) # edited 
    defs = marshal.dumps(f.__defaults__) # edited 
    clsr = marshal.dumps(f.__closure__) # edited 
    fdct = marshal.dumps(f.__dict__) # edited 
    return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable) 

Po tej funkcji, powyższy kod problemem jest również zmienił trochę tak:

from multiprocessing import Pool 
from poolable import make_applicable, make_mappable 

def cube(x): 
    return x**3 

if __name__ == "__main__": 
    pool = Pool(processes=2) 
    results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)] 
    print([result.get(timeout=10) for result in results]) 

I mam wyjścia jak:

[1, 8, 27, 64, 125, 216] 

Myślę, że ten wpis może być przydatny dla niektórych użytkowników systemu Windows.

Powiązane problemy