Używam Pika do przetwarzania danych z RabbitMQ. Jak mi się zdawało, że natrafiłem na różne problemy, postanowiłem napisać małą aplikację testową, aby zobaczyć, jak poradzę sobie z rozłączeniami.Strategia RabbitMQ, Pika i ponowne połączenie
Napisałem tę aplikację testową, która ma następujący:
- Połącz Broker, aż ponownie udanej
- Po podłączeniu utworzyć kolejkę.
- Zużyj tę kolejkę i umieść wynik w pytonie Queue.Queue (0)
- Pobierz element z Queue.Queue (0) i uruchom go ponownie w kolejce brokera.
Co zauważyłem było 2 problemy:
- Kiedy uruchomić mój skrypt z jednego hosta łączącego do RabbitMQ na innego hosta (wewnątrz VM), to skrypty wychodzi na losowych momentach bez wytwarzania błędu.
- Po uruchomieniu skryptu na tym samym komputerze, na którym jest zainstalowany RabbitMQ, działa poprawnie i działa.
Można to wyjaśnić z powodu problemów z siecią, pakiety zostały zrzucone, chociaż uważam, że połączenie nie jest naprawdę solidne.
Po uruchomieniu skryptu lokalnie na serwerze RabbitMQ i zabiję RabbitMQ następnie skrypt kończy działanie z błędem: „ERROR pika SelectConnection: Gniazdo Błąd 3: 104”
Wygląda więc na to, że nie można dostać strategia ponownego podłączenia działa tak, jak powinna. Czy ktoś może rzucić okiem na kod, więc zobacz, co robię źle?
Dzięki,
Jay
#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock
class Broker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.logging = logging.getLogger(__name__)
self.to_broker = Queue.Queue(0)
self.from_broker = Queue.Queue(0)
self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
self.srs = SimpleReconnectionStrategy()
self.properties = pika.BasicProperties(delivery_mode=2)
self.connection = None
while True:
try:
self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs)
break
except Exception as err:
self.logging.warning('Cant connect. Reason: %s' % err)
time.sleep(1)
self.daemon=True
def run(self):
while True:
self.submitData(self.from_broker.get(block=True))
pass
def on_connected(self,connection):
connection.channel(self.on_channel_open)
def on_channel_open(self,new_channel):
self.channel = new_channel
self.channel.queue_declare(queue='sandbox', durable=True)
self.channel.basic_consume(self.processData, queue='sandbox')
def processData(self, ch, method, properties, body):
self.logging.info('Received data from broker')
self.channel.basic_ack(delivery_tag=method.delivery_tag)
self.from_broker.put(body)
def submitData(self,data):
self.logging.info('Submitting data to broker.')
self.channel.basic_publish(exchange='',
routing_key='sandbox',
body=data,
properties=self.properties)
if __name__ == '__main__':
format=('%(asctime)s %(levelname)s %(name)s %(message)s')
logging.basicConfig(level=logging.DEBUG, format=format)
broker=Broker()
broker.start()
try:
broker.connection.ioloop.start()
except Exception as err:
print err
Dzięki za poświęcenie czasu przeżywa kodu i znalezienie wszystkie problemy z nim związane. Obecnie używam http://barryp.org/software/py-amqplib/, co jest imo bardziej podstawową/prostszą biblioteką, ale całkowicie odpowiada moim potrzebom. W połączeniu z geventem mam naprawdę dobre rezultaty. Teraz nie przeszkadzam już w Pice. –
możesz użyć Channel.confirm_delivery(), aby poczekać na potwierdzenie po opublikowaniu, gdy połączenie zostanie zamknięte, upłynie limit czasu, wtedy będziesz wiedział, że wiadomość nie została dostarczona do brokera –