2015-03-30 9 views
7

Używam serwera WWW Tornado do ustawiania w kolejce elementów, które muszą być przetworzone poza cyklem żądania/odpowiedzi.Tworzenie kolejki przetwarzania w systemie Tornado

W poniższym uproszczonym przykładzie poniżej, za każdym razem, gdy przychodzi żądanie, dodaję nowy ciąg do listy o nazwie queued_items. Chcę stworzyć coś, co obejrzy tę listę i przetworzy elementy, które pojawią się w niej.

(W moim prawdziwym kodzie, elementy są przetwarzane i wysyłane przez gniazdo TCP, które może być połączone z serwerem sieciowym, gdy przychodzi żądanie sieci Web. Chcę, aby serwer WWW utrzymywał w kolejce pozycje bez względu na połączenie z gniazdem)

Próbuję zachować ten kod prosty i nie używać zewnętrznych kolejek/programów, takich jak Redis czy Beanstalk. Nie będzie miał bardzo dużej głośności.

Jaki jest dobry sposób używania idiomów Tornado do oglądania listy client.queued_items dla nowych przedmiotów i przetwarzania ich po ich otrzymaniu?

import time 

import tornado.ioloop 
import tornado.gen 
import tornado.web 

class Client(): 

    def __init__(self): 
     self.queued_items = [] 

    @tornado.gen.coroutine 
    def watch_queue(self): 
     # I have no idea what I'm doing 
     items = yield client.queued_items 
     # go_do_some_thing_with_items(items) 

class IndexHandler(tornado.web.RequestHandler): 

    def get(self): 
     client.queued_items.append("%f" % time.time()) 
     self.write("Queued a new item") 

if __name__ == "__main__": 

    client = Client() 

    # Watch the queue for when new items show up 
    client.watch_queue() 

    # Create the web server 
    application = tornado.web.Application([ 
     (r'/', IndexHandler), 
    ], debug=True) 

    application.listen(8888) 
    tornado.ioloop.IOLoop.instance().start() 

Odpowiedz

11

Jest biblioteka nazywa toro, który zapewnia prymitywów synchronizacji dla tornado. [Aktualizacja: Jak tornada 4.2 toro zostały połączone w tornado.]

Brzmi jak można po prostu użyć toro.Queue (lub tornado.queues.Queue w tornado wersji 4.2 lub nowszej), aby obsłużyć tego:

import time 

import toro 
import tornado.ioloop 
import tornado.gen 
import tornado.web 

class Client(): 

    def __init__(self): 
     self.queued_items = toro.Queue() 

    @tornado.gen.coroutine 
    def watch_queue(self): 
     while True: 
      items = yield self.queued_items.get() 
      # go_do_something_with_items(items) 

class IndexHandler(tornado.web.RequestHandler): 

    @tornado.gen.coroutine 
    def get(self): 
     yield client.queued_items.put("%f" % time.time()) 
     self.write("Queued a new item") 

if __name__ == "__main__": 

    client = Client() 

    # Watch the queue for when new items show up 
    tornado.ioloop.IOLoop.instance().add_callback(client.watch_queue) 

    # Create the web server 
    application = tornado.web.Application([ 
     (r'/', IndexHandler), 
    ], debug=True) 

    application.listen(8888) 
    tornado.ioloop.IOLoop.instance().start() 

Nie są kilka poprawek wymaga, oprócz przełączania strukturę danych z listy do toro.Queue:

  1. Musimy zaplanować watch_queue, aby uruchomić wewnątrz IOLoop przy użyciu add_callback, zamiast próbować wywoływać go bezpośrednio poza kontekstem IOLoop.
  2. IndexHandler.get Konwertuje się na coroutine, ponieważ toro.Queue.put jest coroutine.

Dodałem też while True pętli watch_queue, tak, że będzie działać na zawsze, a nie tylko jeden element przetwarzania i wyjście.

+0

To jest dokładnie to, czego potrzebowałem. Dzięki za pokazanie mi, jak to zrobić. – Scott

+0

Dano - Jak mogę przestać oglądać kolejkę? Gdy moje połączenie zepsuje się, będę musiał tymczasowo przestać przetwarzać elementy w kolejce, ale ich nie zgubić. – Scott

+1

toro zostało połączone w tornado i jest teraz przestarzałe. Dla tornado> = 4.2 możesz użyć 'tornado.queues.Queue' –

Powiązane problemy