Jedną z gotowych opcji, która może w tym pomóc jest twisted.internet.defer.DeferredSemaphore
. Jest to asynchroniczna wersja normalnego (liczącego) semafora, który możesz już znać, jeśli zrobiłeś wiele programów z gwintami.
Semafor (zliczający) przypomina trochę muteks (blokada). Ale tam, gdzie mutex może być pozyskany tylko raz, aż do wydania odpowiedniego, semafor (zliczający) może być skonfigurowany, aby umożliwić dowolne (ale określone) liczby przejęć, aby odnieść sukces zanim jakiekolwiek odpowiednie wydania są wymagane.
Oto przykład użycia DeferredSemaphore
uruchomić dziesięć operacji asynchronicznych, ale by uruchomić co najwyżej trzy z nich na raz:
from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def main():
sem = DeferredSemaphore(3)
jobs = []
for i in range(10):
jobs.append(sem.run(async, i))
d = gatherResults(jobs)
d.addCallback(lambda ignored: reactor.stop())
reactor.run()
if __name__ == '__main__':
main()
DeferredSemaphore
ma również wyraźne acquire
i release
metod, ale metoda run
jest tak wygodny prawie zawsze jest to, czego chcesz. Wywołuje metodę acquire
, która zwraca wartość Deferred
. Do tego pierwszego Deferred
dodaje wywołanie zwrotne, które wywołuje funkcję, którą przekazałeś (wraz z dowolnymi argumentami pozycyjnymi lub słowami kluczowymi). Jeśli ta funkcja zwróci Deferred
, to do tego drugiego Deferred
zostanie dodane wywołanie zwrotne, które wywołuje metodę release
.
Sprawa synchroniczna jest obsługiwana również przez natychmiastowe wywołanie release
. Błędy są również obsługiwane, umożliwiając im propagowanie, ale upewniając się, że konieczne jest wykonanie wymaganego release
, aby pozostawić spójny stan DeferredSemaphore
. Wynik funkcji przekazanej do run
(lub wyniku, który zwraca Deferred
) staje się wynikiem wartości Deferred
zwróconej przez run
.
Inne możliwe podejście może być oparte na DeferredQueue
i cooperate
. DeferredQueue
jest zwykle jak normalna kolejka, ale jego metoda get
zwraca wartość Deferred
. Jeśli w momencie połączenia nie ma żadnych elementów w kolejce, Deferred
nie zostanie uruchomiony, dopóki element nie zostanie dodany.
Oto przykład:
from random import randrange
from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor
def async(n):
print 'Starting job', n
d = deferLater(reactor, n, lambda: None)
def cbFinished(ignored):
print 'Finishing job', n
d.addCallback(cbFinished)
return d
def assign(jobs):
# Create new jobs to be processed
jobs.put(randrange(10))
reactor.callLater(randrange(10), assign, jobs)
def worker(jobs):
while True:
yield jobs.get().addCallback(async)
def main():
jobs = DeferredQueue()
for i in range(10):
jobs.put(i)
assign(jobs)
for i in range(3):
cooperate(worker(jobs))
reactor.run()
if __name__ == '__main__':
main()
pamiętać, że funkcja pracownik async
jest taki sam jak ten z pierwszego przykładu. Jednak tym razem dostępna jest również funkcja worker
, która jawnie odciąga zadania od DeferredQueue
i przetwarza je za pomocą async
(przez dodanie async
jako wywołania zwrotnego do Deferred
zwróconego przez get
). Generator worker
jest sterowany przez cooperate
, który iteruje go raz po każdym Deferred
powoduje pożary.Następnie główna pętla uruchamia trzy z tych generatorów pracowniczych, dzięki czemu w danym momencie trzy zadania będą w toku.
To podejście wymaga nieco więcej kodu niż podejście DeferredSemaphore
, ale ma pewne zalety, które mogą być interesujące. Po pierwsze, cooperate
zwraca instancję CooperativeTask
, która ma użyteczne metody, takie jak pause
, resume
i kilka innych. Ponadto wszystkie zadania przypisane temu samemu współpracownikowi będą ze sobą współpracować , aby nie przeciążać pętli zdarzeń (i to właśnie nadaje temu API jego nazwę). Po stronie DeferredQueue
można także ustawić limity liczby oczekujących na przetworzenie elementów, dzięki czemu można uniknąć całkowitego przeciążenia serwera (na przykład, jeśli procesor obrazu utknie i przestanie wykonywać zadania). Jeśli kod wywołujący put
obsługuje wyjątek przepełnienia kolejki, można go użyć jako ciśnienia, aby spróbować zatrzymać przyjmowanie nowych zadań (być może przestawiając je na inny serwer lub ostrzegając administratora). Robienie podobnych rzeczy z DeferredSemaphore
jest nieco trudniejsze, ponieważ nie ma możliwości ograniczenia liczby zadań oczekujących na uzyskanie semafora.
Fajnie, naprawdę doceniam te pomysły. W odpowiedzi na pomysł użycia DeferredSemaphore. Byłoby to bardzo przydatne, gdyby istniały odrębne partie zadań, które musiałyby zostać zakończone. Jeśli partia ma za dużo zadań do zrobienia, wykonuje tylko kilka zadań jednocześnie, a po zakończeniu wszystkich zadań partia jest gromadzona. Ma to wadę, że żadne wyniki nie są zwracane, dopóki cała partia nie zakończy się prawidłowo? I myślę, że ten minus jest rozwiązywany za pomocą DeferredQueue ... – agartland
Podejście z DeferredQueue i współpraca jest sprytna. To naprawdę da mi większą kontrolę w przyszłości, jeśli chodzi o skalowanie procesora. Nie sądzę nawet, że musi być bardziej skomplikowana. Dziękuję Ci. – agartland