2012-02-29 12 views
12

Używam Pika do przetwarzania danych z RabbitMQ. Jak mi się zdawało, że natrafiłem na różne problemy, postanowiłem napisać małą aplikację testową, aby zobaczyć, jak poradzę sobie z rozłączeniami.Strategia RabbitMQ, Pika i ponowne połączenie

Napisałem tę aplikację testową, która ma następujący:

  1. Połącz Broker, aż ponownie udanej
  2. Po podłączeniu utworzyć kolejkę.
  3. Zużyj tę kolejkę i umieść wynik w pytonie Queue.Queue (0)
  4. Pobierz element z Queue.Queue (0) i uruchom go ponownie w kolejce brokera.

Co zauważyłem było 2 problemy:

  1. Kiedy uruchomić mój skrypt z jednego hosta łączącego do RabbitMQ na innego hosta (wewnątrz VM), to skrypty wychodzi na losowych momentach bez wytwarzania błędu.
  2. Po uruchomieniu skryptu na tym samym komputerze, na którym jest zainstalowany RabbitMQ, działa poprawnie i działa.

Można to wyjaśnić z powodu problemów z siecią, pakiety zostały zrzucone, chociaż uważam, że połączenie nie jest naprawdę solidne.

Po uruchomieniu skryptu lokalnie na serwerze RabbitMQ i zabiję RabbitMQ następnie skrypt kończy działanie z błędem: „ERROR pika SelectConnection: Gniazdo Błąd 3: 104”

Wygląda więc na to, że nie można dostać strategia ponownego podłączenia działa tak, jak powinna. Czy ktoś może rzucić okiem na kod, więc zobacz, co robię źle?

Dzięki,

Jay

#!/bin/python 
import logging 
import threading 
import Queue 
import pika 
from pika.reconnection_strategies import SimpleReconnectionStrategy 
from pika.adapters import SelectConnection 
import time 
from threading import Lock 

class Broker(threading.Thread): 
    def __init__(self): 
     threading.Thread.__init__(self) 
     self.logging = logging.getLogger(__name__) 
     self.to_broker = Queue.Queue(0) 
     self.from_broker = Queue.Queue(0) 
     self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True) 
     self.srs = SimpleReconnectionStrategy() 
     self.properties = pika.BasicProperties(delivery_mode=2) 

     self.connection = None 
     while True: 
      try: 
       self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs) 
       break 
      except Exception as err: 
       self.logging.warning('Cant connect. Reason: %s' % err) 
       time.sleep(1) 

     self.daemon=True 
    def run(self): 
     while True: 
      self.submitData(self.from_broker.get(block=True)) 
     pass 
    def on_connected(self,connection): 
     connection.channel(self.on_channel_open) 
    def on_channel_open(self,new_channel): 
     self.channel = new_channel 
     self.channel.queue_declare(queue='sandbox', durable=True) 
     self.channel.basic_consume(self.processData, queue='sandbox')  
    def processData(self, ch, method, properties, body): 
     self.logging.info('Received data from broker') 
     self.channel.basic_ack(delivery_tag=method.delivery_tag) 
     self.from_broker.put(body) 
    def submitData(self,data): 
     self.logging.info('Submitting data to broker.') 
     self.channel.basic_publish(exchange='', 
        routing_key='sandbox', 
        body=data, 
        properties=self.properties) 
if __name__ == '__main__': 
    format=('%(asctime)s %(levelname)s %(name)s %(message)s') 
    logging.basicConfig(level=logging.DEBUG, format=format) 
    broker=Broker() 
    broker.start() 
    try: 
     broker.connection.ioloop.start() 
    except Exception as err: 
     print err 

Odpowiedz

17

Głównym problemem ze skryptu jest to, że interakcje z pojedynczym kanałem zarówno z głównego wątku (gdzie ioloop jest uruchomiony) i „Broker” wątek (wywołuje submitData w pętli). To jest not safe.

Również, SimpleReconnectionStrategy wydaje się nie robić nic użytecznego. Nie powoduje ponownego połączenia, jeśli połączenie zostanie przerwane. Wydaje mi się, że jest to błąd w Pika: https://github.com/pika/pika/issues/120

Próbowałem zmienić kod, aby działał tak jak myślę, że chciałeś, ale wpadł na inny problem. Wygląda na to, że Pika nie ma sposobu na wykrycie awarii dostawy, co oznacza, że ​​dane mogą zostać utracone, jeśli połączenie zostanie przerwane. To wydaje się być tak oczywistym wymogiem! Jak nie można znaleźć sposobu, aby wykryć, że błąd nie powiodł się? Próbowałem różnych rzeczy, w tym transakcji i add_on_return_callback (z których wszystkie wydawały się niezgrabne i zbyt skomplikowane), ale nic nie znalazłem. Jeśli tak naprawdę nie ma mowy, to tylko pika wydaje się być przydatna w sytuacjach, które mogą tolerować utratę danych wysłanych do RabbitMQ, lub w programach, które potrzebują skonsumować tylko od RabbitMQ.

To nie jest niezawodne, ale w celach informacyjnych, oto niektóre kod, który rozwiązuje problem wielowątkowego:

import logging 
import pika 
import Queue 
import sys 
import threading 
import time 
from functools import partial 
from pika.adapters import SelectConnection, BlockingConnection 
from pika.exceptions import AMQPConnectionError 
from pika.reconnection_strategies import SimpleReconnectionStrategy 

log = logging.getLogger(__name__) 

DEFAULT_PROPERTIES = pika.BasicProperties(delivery_mode=2) 


class Broker(object): 

    def __init__(self, parameters, on_channel_open, name='broker'): 
     self.parameters = parameters 
     self.on_channel_open = on_channel_open 
     self.name = name 

    def connect(self, forever=False): 
     name = self.name 
     while True: 
      try: 
       connection = SelectConnection(
        self.parameters, self.on_connected) 
       log.debug('%s connected', name) 
      except Exception: 
       if not forever: 
        raise 
       log.warning('%s cannot connect', name, exc_info=True) 
       time.sleep(10) 
       continue 

      try: 
       connection.ioloop.start() 
      finally: 
       try: 
        connection.close() 
        connection.ioloop.start() # allow connection to close 
       except Exception: 
        pass 

      if not forever: 
       break 

    def on_connected(self, connection): 
     connection.channel(self.on_channel_open) 


def setup_submitter(channel, data_queue, properties=DEFAULT_PROPERTIES): 
    def on_queue_declared(frame): 
     # PROBLEM pika does not appear to have a way to detect delivery 
     # failure, which means that data could be lost if the connection 
     # drops... 
     channel.confirm_delivery(on_delivered) 
     submit_data() 

    def on_delivered(frame): 
     if frame.method.NAME in ['Confirm.SelectOk', 'Basic.Ack']: 
      log.info('submission confirmed %r', frame) 
      # increasing this value seems to cause a higher failure rate 
      time.sleep(0) 
      submit_data() 
     else: 
      log.warn('submission failed: %r', frame) 
      #data_queue.put(...) 

    def submit_data(): 
     log.info('waiting on data queue') 
     data = data_queue.get() 
     log.info('got data to submit') 
     channel.basic_publish(exchange='', 
        routing_key='sandbox', 
        body=data, 
        properties=properties, 
        mandatory=True) 
     log.info('submitted data to broker') 

    channel.queue_declare(
     queue='sandbox', durable=True, callback=on_queue_declared) 


def blocking_submitter(parameters, data_queue, 
     properties=DEFAULT_PROPERTIES): 
    while True: 
     try: 
      connection = BlockingConnection(parameters) 
      channel = connection.channel() 
      channel.queue_declare(queue='sandbox', durable=True) 
     except Exception: 
      log.error('connection failure', exc_info=True) 
      time.sleep(1) 
      continue 
     while True: 
      log.info('waiting on data queue') 
      try: 
       data = data_queue.get(timeout=1) 
      except Queue.Empty: 
       try: 
        connection.process_data_events() 
       except AMQPConnectionError: 
        break 
       continue 
      log.info('got data to submit') 
      try: 
       channel.basic_publish(exchange='', 
          routing_key='sandbox', 
          body=data, 
          properties=properties, 
          mandatory=True) 
      except Exception: 
       log.error('submission failed', exc_info=True) 
       data_queue.put(data) 
       break 
      log.info('submitted data to broker') 


def setup_receiver(channel, data_queue): 
    def process_data(channel, method, properties, body): 
     log.info('received data from broker') 
     data_queue.put(body) 
     channel.basic_ack(delivery_tag=method.delivery_tag) 

    def on_queue_declared(frame): 
     channel.basic_consume(process_data, queue='sandbox') 

    channel.queue_declare(
     queue='sandbox', durable=True, callback=on_queue_declared) 


if __name__ == '__main__': 
    if len(sys.argv) != 2: 
     print 'usage: %s RABBITMQ_HOST' % sys.argv[0] 
     sys.exit() 

    format=('%(asctime)s %(levelname)s %(name)s %(message)s') 
    logging.basicConfig(level=logging.DEBUG, format=format) 

    host = sys.argv[1] 
    log.info('connecting to host: %s', host) 
    parameters = pika.ConnectionParameters(host=host, heartbeat=True) 
    data_queue = Queue.Queue(0) 
    data_queue.put('message') # prime the pump 

    # run submitter in a thread 

    setup = partial(setup_submitter, data_queue=data_queue) 
    broker = Broker(parameters, setup, 'submitter') 
    thread = threading.Thread(target= 
     partial(broker.connect, forever=True)) 

    # uncomment these lines to use the blocking variant of the submitter 
    #thread = threading.Thread(target= 
    # partial(blocking_submitter, parameters, data_queue)) 

    thread.daemon = True 
    thread.start() 

    # run receiver in main thread 
    setup = partial(setup_receiver, data_queue=data_queue) 
    broker = Broker(parameters, setup, 'receiver') 
    broker.connect(forever=True) 
+0

Dzięki za poświęcenie czasu przeżywa kodu i znalezienie wszystkie problemy z nim związane. Obecnie używam http://barryp.org/software/py-amqplib/, co jest imo bardziej podstawową/prostszą biblioteką, ale całkowicie odpowiada moim potrzebom. W połączeniu z geventem mam naprawdę dobre rezultaty. Teraz nie przeszkadzam już w Pice. –

+1

możesz użyć Channel.confirm_delivery(), aby poczekać na potwierdzenie po opublikowaniu, gdy połączenie zostanie zamknięte, upłynie limit czasu, wtedy będziesz wiedział, że wiadomość nie została dostarczona do brokera –

Powiązane problemy