2010-05-18 11 views
11

Siłą Twisted (dla Pythona) jest jego asynchroniczny framework (chyba). Napisałem serwer przetwarzania obrazu, który pobiera żądania za pośrednictwem Brokera Perspective. Działa tak dobrze, jak długo karmię go mniej niż kilkaset obrazów naraz. Czasami jednak dochodzi do tego w setkach obrazów praktycznie w tym samym czasie. Ponieważ próbuje je wszystkie przetwarzać jednocześnie, serwer ulega awarii.Zdalne wywoływanie w kolejce do brokera perspektywy Pythona?

Jako rozwiązanie chciałbym umieścić w kolejce remote_calls na serwerze, aby przetwarzał tylko 100 obrazów na raz. Wygląda na to, że może to być coś, co Twisted już robi, ale nie mogę tego znaleźć. Wszelkie pomysły, jak rozpocząć wdrażanie tego? Punkt we właściwym kierunku? Dzięki!

Odpowiedz

29

Jedną z gotowych opcji, która może w tym pomóc jest twisted.internet.defer.DeferredSemaphore. Jest to asynchroniczna wersja normalnego (liczącego) semafora, który możesz już znać, jeśli zrobiłeś wiele programów z gwintami.

Semafor (zliczający) przypomina trochę muteks (blokada). Ale tam, gdzie mutex może być pozyskany tylko raz, aż do wydania odpowiedniego, semafor (zliczający) może być skonfigurowany, aby umożliwić dowolne (ale określone) liczby przejęć, aby odnieść sukces zanim jakiekolwiek odpowiednie wydania są wymagane.

Oto przykład użycia DeferredSemaphore uruchomić dziesięć operacji asynchronicznych, ale by uruchomić co najwyżej trzy z nich na raz:

from twisted.internet.defer import DeferredSemaphore, gatherResults 
from twisted.internet.task import deferLater 
from twisted.internet import reactor 


def async(n): 
    print 'Starting job', n 
    d = deferLater(reactor, n, lambda: None) 
    def cbFinished(ignored): 
     print 'Finishing job', n 
    d.addCallback(cbFinished) 
    return d 


def main(): 
    sem = DeferredSemaphore(3) 

    jobs = [] 
    for i in range(10): 
     jobs.append(sem.run(async, i)) 

    d = gatherResults(jobs) 
    d.addCallback(lambda ignored: reactor.stop()) 
    reactor.run() 


if __name__ == '__main__': 
    main() 

DeferredSemaphore ma również wyraźne acquire i release metod, ale metoda run jest tak wygodny prawie zawsze jest to, czego chcesz. Wywołuje metodę acquire, która zwraca wartość Deferred. Do tego pierwszego Deferred dodaje wywołanie zwrotne, które wywołuje funkcję, którą przekazałeś (wraz z dowolnymi argumentami pozycyjnymi lub słowami kluczowymi). Jeśli ta funkcja zwróci Deferred, to do tego drugiego Deferred zostanie dodane wywołanie zwrotne, które wywołuje metodę release.

Sprawa synchroniczna jest obsługiwana również przez natychmiastowe wywołanie release. Błędy są również obsługiwane, umożliwiając im propagowanie, ale upewniając się, że konieczne jest wykonanie wymaganego release, aby pozostawić spójny stan DeferredSemaphore. Wynik funkcji przekazanej do run (lub wyniku, który zwraca Deferred) staje się wynikiem wartości Deferred zwróconej przez run.

Inne możliwe podejście może być oparte na DeferredQueue i cooperate. DeferredQueue jest zwykle jak normalna kolejka, ale jego metoda get zwraca wartość Deferred. Jeśli w momencie połączenia nie ma żadnych elementów w kolejce, Deferred nie zostanie uruchomiony, dopóki element nie zostanie dodany.

Oto przykład:

from random import randrange 

from twisted.internet.defer import DeferredQueue 
from twisted.internet.task import deferLater, cooperate 
from twisted.internet import reactor 


def async(n): 
    print 'Starting job', n 
    d = deferLater(reactor, n, lambda: None) 
    def cbFinished(ignored): 
     print 'Finishing job', n 
    d.addCallback(cbFinished) 
    return d 


def assign(jobs): 
    # Create new jobs to be processed 
    jobs.put(randrange(10)) 
    reactor.callLater(randrange(10), assign, jobs) 


def worker(jobs): 
    while True: 
     yield jobs.get().addCallback(async) 


def main(): 
    jobs = DeferredQueue() 

    for i in range(10): 
     jobs.put(i) 

    assign(jobs) 

    for i in range(3): 
     cooperate(worker(jobs)) 

    reactor.run() 


if __name__ == '__main__': 
    main() 

pamiętać, że funkcja pracownik async jest taki sam jak ten z pierwszego przykładu. Jednak tym razem dostępna jest również funkcja worker, która jawnie odciąga zadania od DeferredQueue i przetwarza je za pomocą async (przez dodanie async jako wywołania zwrotnego do Deferred zwróconego przez get). Generator worker jest sterowany przez cooperate, który iteruje go raz po każdym Deferred powoduje pożary.Następnie główna pętla uruchamia trzy z tych generatorów pracowniczych, dzięki czemu w danym momencie trzy zadania będą w toku.

To podejście wymaga nieco więcej kodu niż podejście DeferredSemaphore, ale ma pewne zalety, które mogą być interesujące. Po pierwsze, cooperate zwraca instancję CooperativeTask, która ma użyteczne metody, takie jak pause, resume i kilka innych. Ponadto wszystkie zadania przypisane temu samemu współpracownikowi będą ze sobą współpracować , aby nie przeciążać pętli zdarzeń (i to właśnie nadaje temu API jego nazwę). Po stronie DeferredQueue można także ustawić limity liczby oczekujących na przetworzenie elementów, dzięki czemu można uniknąć całkowitego przeciążenia serwera (na przykład, jeśli procesor obrazu utknie i przestanie wykonywać zadania). Jeśli kod wywołujący put obsługuje wyjątek przepełnienia kolejki, można go użyć jako ciśnienia, aby spróbować zatrzymać przyjmowanie nowych zadań (być może przestawiając je na inny serwer lub ostrzegając administratora). Robienie podobnych rzeczy z DeferredSemaphore jest nieco trudniejsze, ponieważ nie ma możliwości ograniczenia liczby zadań oczekujących na uzyskanie semafora.

+0

Fajnie, naprawdę doceniam te pomysły. W odpowiedzi na pomysł użycia DeferredSemaphore. Byłoby to bardzo przydatne, gdyby istniały odrębne partie zadań, które musiałyby zostać zakończone. Jeśli partia ma za dużo zadań do zrobienia, wykonuje tylko kilka zadań jednocześnie, a po zakończeniu wszystkich zadań partia jest gromadzona. Ma to wadę, że żadne wyniki nie są zwracane, dopóki cała partia nie zakończy się prawidłowo? I myślę, że ten minus jest rozwiązywany za pomocą DeferredQueue ... – agartland

+1

Podejście z DeferredQueue i współpraca jest sprytna. To naprawdę da mi większą kontrolę w przyszłości, jeśli chodzi o skalowanie procesora. Nie sądzę nawet, że musi być bardziej skomplikowana. Dziękuję Ci. – agartland

-2

Być może spodoba Ci się również txRDQ (Resizable Dispatch Queue), którą napisałem. Google to, jest w kolekcji tx na LaunchPad. Przepraszam, nie mam więcej czasu na odpowiedź - o tym, by wejść na scenę.

Terry

Powiązane problemy