2011-08-15 10 views
32

Wydaje mi się, że im dłużej utrzymuję mój serwer rabbitmq, tym więcej mam kłopotów z niepotwierdzonymi wiadomościami. Chciałbym je im przypomnieć. W rzeczywistości wydaje się, że jest to polecenie amqp, ale dotyczy to tylko kanału, z którego korzysta twoje połączenie. I zbudował mały pika skrypt przynajmniej spróbować go, ale ja też czegoś brakuje lub nie można zrobić w ten sposób (jak o z rabbitmqctl?)Jak mogę odzyskać niepotwierdzone wiadomości AMQP z innych kanałów niż moje własne połączenie?

import pika 

credentials = pika.PlainCredentials('***', '***') 
parameters = pika.ConnectionParameters(host='localhost',port=5672,\ 
    credentials=credentials, virtual_host='***') 

def handle_delivery(body): 
    """Called when we receive a message from RabbitMQ""" 
    print body 

def on_connected(connection): 
    """Called when we are fully connected to RabbitMQ""" 
    connection.channel(on_channel_open)  

def on_channel_open(new_channel): 
    """Called when our channel has opened""" 
    global channel 
    channel = new_channel 
    channel.basic_recover(callback=handle_delivery,requeue=True)  

try: 
    connection = pika.SelectConnection(parameters=parameters,\ 
     on_open_callback=on_connected)  

    # Loop so we can communicate with RabbitMQ 
    connection.ioloop.start() 
except KeyboardInterrupt: 
    # Gracefully close the connection 
    connection.close() 
    # Loop until we're fully closed, will stop on its own 
    connection.ioloop.start() 
+0

Czy udało się rozwiązać ten problem? – 13hsoj

+0

https://stackoverflow.com/questions/8296201/when-does-an-amqp-rabbitmq-channel-with-no-connections-die Odpowiedź na to pytanie jest potencjalnie potrzebna w zależności od tego, dlaczego inne kanały wciąż mają problemy niepakowane wiadomości. Kanały zombie. Nie dup, ponieważ ten temat dotyczy wiadomości w innych kanałach, a nie samych kanałów. –

Odpowiedz

45

niepotwierdzone komunikaty są te, które zostały dostarczone w poprzek sieci do konsumenta, ale nie zostały jeszcze odrzucone lub odrzucone - ale konsument nie zamknął jeszcze kanału lub połączenia, z którego pierwotnie je otrzymał. Dlatego pośrednik nie może dowiedzieć się, czy konsument po prostu zajmuje dużo czasu na przetworzenie tych wiadomości lub jeśli o nich zapomniał. Pozostawia ich zatem w nie potwierdzonym stanie, dopóki konsument nie umrze, albo nie zostanie odebrany lub odrzucony.

Ponieważ wiadomości te mogą nadal być prawidłowo przetwarzane w przyszłości przez wciąż żyjącego konsumenta, który je zużył, nie można (według mojej wiedzy) włożyć do niego innego konsumenta i podejmować na jego podstawie zewnętrznych decyzji. Musisz naprawić swoich klientów, aby podejmowali decyzje dotyczące każdej wiadomości, ponieważ są przetwarzane, a nie pozostawiają starych wiadomości niepotwierdzonych.

+0

, więc basic.recover _must_ będzie wywoływany przez konsumenta? Używam seleryd do zarządzania połączeniami. może być możliwe wysłanie tego polecenia do słabo reagujących kolejek za pomocą celeryctl (jeśli wiesz, że ...) –

+3

@ wykorzystam moje kondolencje, że używasz Celery. Twórcy selera po prostu nie rozumieją AMQP i stworzyli źle zepsutą implementację. Musisz dokonać wyboru, albo pozbyć się selera i zrobić AMQP w prawo, albo przestać używać AMQP z selerem i użyć czegoś prostego jak Redis. Zdecydowałem się rzucić seler i zostać z AMQP. –

+6

to dość oskarżenie. jeśli nie masz nic przeciwko temu, że pytam, to co z implementacją AMQP selera nie jest wykonane poprawnie? –

10

Jeśli wiadomości są unacked istnieją tylko dwa sposoby, aby je z powrotem do kolejki:

  1. basic.nack

    To polecenie spowoduje wiadomość zostać ponownie umieszczonym w kolejce i ponownie dostarczony.

  2. Odłączyć od brokera

    Akcja ta zmusi wszystkie unacked wiadomości z tego kanału należy umieścić z powrotem w kolejce.

UWAGA: basic.recover postaram się opublikować unacked wiadomości na tym samym kanale (do tego samego konsumenta), co jest czasami pożądane zachowanie.

RabbitMQ spec for basic.recover and basic.nack


Prawdziwe pytanie brzmi: Dlaczego komunikaty niepotwierdzone?

Możliwe scenariusze: Pobieranie

  1. Consumer zbyt wielu wiadomości, a następnie nie przetwarza i acking im wystarczająco szybko.

    Rozwiązanie: Należy wstępnie pobrać jak najmniejszą liczbę wiadomości.

  2. Buggy biblioteki klienta (mam ten problem obecnie z pika 0.9.13. Jeśli kolejka ma dużo wiadomości, pewna liczba komunikatów będzie utknąć unacked nawet godziny później.

    Rozwiązanie : Muszę ponownie uruchomić klienta kilka razy, aż wszystkie nieakceptowane wiadomości znikną z kolejki.

+0

Czy zgłoszono twój problem z pika? Czy możesz podać link? – istepaniuk

+0

To limit rekursji Pythona, który się pojawił. Coś o tym, że nie można powtórzyć> 1000 razy, co najwyraźniej działo się z pika 0.9.13. Nie widząc go z 0.9.14. – IvanD

+3

Na koniec znaleziono miejsce zgłoszenia problemu: https://github.com/pika/pika/issues/286 – IvanD

2

Wszystkie niepotwierdzone wiadomości przejdą w stan gotowości po zatrzymaniu wszystkich pracowników/konsumentów.

Upewnij się, że wszyscy pracownicy są zatrzymani, potwierdzając dane wyjściowe grep na ps aux i zatrzymując/zabijając je, jeśli je znaleziono.

Jeśli zarządzasz pracownikami za pomocą przełożonego, który pokazuje, że pracownik jest zatrzymany, możesz chcieć sprawdzić obecność zombie. Supervisor informuje pracownika, że ​​ma zostać zatrzymany, ale nadal znajdziesz procesy zombie uruchomione, gdy grepped na wyjściu PS aux. Zabicie procesów zombie przywróci komunikaty do stanu gotowości.

+0

Możesz także sprawdzić, czy połączenie z królikiem jest wstrzymywane przez proces zombie, korzystając z konsoli zarządzania RabbitMQ, Opisałem tutaj: http://stackoverflow.com/questions/11926077/rabbitmq-messages-remain-unackgotledged/43026774#43026774 –

Powiązane problemy