2013-07-09 10 views
5

Adapterem Tornado obsługującym bibliotekę Pika, here jest przykładowy sposób publikowania wiadomości za pomocą adaptera asynchronicznego.Jak komunikować się z RabbitMQ (biblioteka Pika) w aplikacji tornado

Chcę użyć pika w aplikacji tornado, tylko przykład, chcę umieścić dane żądania tornado do RabbitMQ, Ale nie wiem jak to zrobić.

Dwa pytania nie wiem, jak rozwiązać.

1 Pika adapter Zastosowanie tornado posiada własną ioloop,

self._connection = pika.SelectConnection(pika.URLParameters(self._url), 
             self.on_connection_open) 
self._connection.ioloop.start() 

aplikacja Tornado posiada własną ioloop,

tornado.ioloop.IOLoop.instance().start() 

Jak połączyć te dwa ioloop?

2 Przykład Pika publikuje tę samą wiadomość raz za razem, ale chcę opublikować dane żądania, jak przekazać dane żądania do metody publikacji?

Odpowiedz

6

Po wyszukaniu dokładnie tego samego, znalazłem to blog post of Kevin Jing Qiu.

Poszedłem do dziury królika nieco dalej, aby każdy websocket miał swój własny zestaw kanałów i kolejek.

Fragment z mojego projektu można znaleźć poniżej. Aplikacja tornado związana z RabbitMQ składa się z następujących części:

  1. Aplikacja Tornado, która obsługuje żądania stron internetowych. Widzę tu tylko długowieczne websockets, ale możesz to zrobić również dzięki krótkim żądaniom http.
  2. Połączenie (jedno) RabbitMQ przez instancję PikaClient
  3. połączenie internetowe definiujące kanały, kolejki i giełdy po uruchomieniu metody otwartej.

Teraz połączenie internetowe może odbierać dane z tornada (dane z przeglądarki) za pośrednictwem wiadomości on_message i wysyłać je do RabbitMQ.

Połączenie websocket otrzyma dane z RabbitMQ via basic_consume.

To nie jest w pełni funkcjonalne, ale powinieneś wziąć ten pomysł.

class PikaClient(object): 

    def __init__(self, io_loop): 
     logger.info('PikaClient: __init__') 
     self.io_loop = io_loop 

     self.connected = False 
     self.connecting = False 
     self.connection = None 
     self.channel = None 
     self.message_count = 0 
    """ 
    Pika-Tornado connection setup 
    The setup process is a series of callback methods. 
    connect:connect to rabbitmq and build connection to tornado io loop -> 
    on_connected: create a channel to rabbitmq -> 
    on_channel_open: declare queue tornado, bind that queue to exchange 
        chatserver_out and start consuming messages. 
    """ 

    def connect(self): 
     if self.connecting: 
      #logger.info('PikaClient: Already connecting to RabbitMQ') 
      return 

     #logger.info('PikaClient: Connecting to RabbitMQ') 
     self.connecting = True 

     cred = pika.PlainCredentials('guest', 'guest') 
     param = pika.ConnectionParameters(
      host='localhost', 
      port=5672, 
      virtual_host='/', 
      credentials=cred 
     ) 
     self.connection = TornadoConnection(param, 
      on_open_callback=self.on_connected,stop_ioloop_on_close=False) 
     self.connection.add_on_close_callback(self.on_closed) 

    def on_connected(self, connection): 
     logger.info('PikaClient: connected to RabbitMQ') 
     self.connected = True 
     self.connection = connection 
     # now you are able to call the pika api to do things 
     # this could be exchange setup for websocket connections to 
     # basic_publish to later. 
     self.connection.channel(self.on_channel_open) 

    def on_channel_open(self, channel): 
     logger.info('PikaClient: Channel %s open, Declaring exchange' % channel) 
     self.channel = channel 

    def on_closed(self, connection): 
     logger.info('PikaClient: rabbit connection closed') 
     self.io_loop.stop() 


class MyWebSocketHandler(websocket.WebSocketHandler): 
    def __init__(self): 
     self.status = 'not connected yet' 

    def open(self, *args, **kwargs): 
     self.status = "ws open" 
     self.rabbit_connect() # connect this websocket object to rabbitmq 

    def rabbit_connect(): 
     self.application.pc.connection.channel(self.rabbit_channel_in_ok) 

    def rabbit_channel_in_ok(self,channel): 
     self.channel_in = channel 
     self.channel_in.queue_declare(self.rabbit_declare_ok, 
             exclusive=True,auto_delete=True) 


# and so on... 


handlers = [ your_definitions_here_like_websockets_or_such ] 
settings = { your_settings_here } 
application = tornado.web.Application(handlers,**settings) 

def main(): 
    io_loop = tornado.ioloop.IOLoop.instance() 
    # PikaClient is our rabbitmq consumer 
    pc = PikaClient(io_loop) 
    application.pc = pc 
    application.pc.connect() 
    application.listen(config.tornadoport) 
    try: 
     io_loop.start() 
    except KeyboardInterrupt: 
     io_loop.stop() 

if __name__ == '__main__': 
    main() 
+0

Podany link jest nieaktualny. – FactualHarmony

+0

dzięki. i naprawione. – itsafire

Powiązane problemy