2012-02-22 10 views
14

Chcę wiadomości procesowe w kilku wątkach ale dostaję błąd podczas wykonywania tego kodu:błąd „nieznany znacznik dostawy” występuje wtedy, gdy próbuję wiadomości ACK do RabbitMQ użyciu Pika (Python)

from __future__ import with_statement 
import pika 
import sys 
from pika.adapters.blocking_connection import BlockingConnection 
from pika import connection, credentials 
import time 
import threading 
import random 
from pika.adapters.select_connection import SelectConnection 
from pika.connection import Connection 
import traceback 


def doWork(body, args, channel): 


    r = random.random() 
    time.sleep(r * 10) 
    try:   
     channel.basic_ack(delivery_tag=args.delivery_tag) 

    except : 
     traceback.print_exc() 


auth = credentials.PlainCredentials(username="guest", password="guest") 
params = connection.ConnectionParameters(host="localhost", credentials=auth) 
conn = BlockingConnection(params) 
channel = conn.channel() 


while True: 

    time.sleep(0.03)  
    try: 

     method_frame, header_frame, body = channel.basic_get(queue="test_queue") 
     if method_frame.NAME == 'Basic.GetEmpty': 
      continue   

     t = threading.Thread(target=doWork, args=[body, method_frame, channel]) 
     t.setDaemon(True) 
     t.start() 

    except Exception, e: 
     traceback.print_exc() 
     continue 

Error desctiption:

 
Traceback (most recent call last): 
    File "C:\work\projects\mq\start.py", line 43, in 
    method_frame, header_frame, body = channel.basic_get(queue="test_queue") 
    File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 318, in basic_get 
    self.basic_get_(self, self._on_basic_get, ticket, queue, no_ack) 
    File "C:\work\projects\mq\libs\pika\channel.py", line 469, in basic_get 
    no_ack=no_ack)) 
    File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 244, in send_method 
    self.connection.process_data_events() 
    File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 94, in process_data_events 
    self._handle_read() 
    File "C:\work\projects\mq\libs\pika\adapters\base_connection.py", line 162, in _handle_read 
    self._on_data_available(data) 
    File "C:\work\projects\mq\libs\pika\connection.py", line 589, in _on_data_available 
    frame)     # Args 
    File "C:\work\projects\mq\libs\pika\callback.py", line 124, in process 
    callback(*args, **keywords) 
    File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 269, in _on_remote_close 
    frame.method.reply_text) 
AMQPChannelError: (406, 'PRECONDITION_FAILED - unknown delivery tag 204') 

wersje: pika 0.9.5, 2.6.1 RabbitMQ

+0

Wczoraj próbowałem użyć PY-amqplib biblioteki zamiast pika. Wszystko działało dobrze. Prawdopodobnie jest problem w bibliotece pika. – solo117

+1

Jeśli chcesz udostępnić swój kod w wielu wątkach, powinieneś użyć biblioteki bezpiecznej dla wątków, takiej jak rabbitpy lub amqp-storm. Nie jestem pewien, czy py-amqplib jest bezpieczny dla wątków. https://github.com/eandersson/amqp-storm – eandersson

Odpowiedz

3

nie mam poprawkę, ale mogę sprawdzić, czy występuje przy użyciu adaptera BlockingConnection .

Konsekwentnie występuje przy potwierdzeniu lub odrzucenia polecenia, które ma być redelivered w odpowiedzi na channel.basic_recover()

pika RabbitMQ 0.9.5, 2.2.0, 2.7 pytona i Erlang R14B01

Obejście mam na miejscu jest zawsze określić deliver_tag = 0

podejrzewam, że to działa tylko wtedy, gdy wiadomość jesteś acking/nacking jest ostatni czytałeś (w strumieniu). Biblioteka, którą piszę, streszcza komunikat w taki sposób, że każdy może być uznany niezależnie, co zrywa z tym rozwiązaniem.

Czy ktoś może potwierdzić, czy zostało to już naprawione lub potwierdzone przez kogokolwiek z zespołu pika? Czy może to być problem z RabbitMQ?

+0

Widzę ten błąd z węzłem-amqp, więc musi to być problem z RabbitMQ (wersja 3.0.2-1). – alexfernandez

0

Po obejrzeniu RabbitMQ - upgraded to a new version and got a lot of "PRECONDITION_FAILED unknown delivery tag 1"

zmieniłem podstawowy-konsumować wyglądać tak:

consumer_tag = channel.basic_consume(
     message_delivery_event, 
     no_ack=True, 
     queue=queue, 
    ) 

To miało wpływ powodując opisany błąd na początkowych (nie redelivered) potwierdzeń, gdy tag dostawy depeszy został określony. Dostarczenie zostało wyodrębnione ze struktury metody dostarczania wiadomości.

Korzystanie

channel.basic_ack(delivery_tag=0) 

tłumi błąd w tym przypadku zbyt

Patrząc na http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-July/013664.html sprawia, że ​​wydaje się, że może to być problem w RabbitMQ.

26

Problemem prawdopodobnie jest to, że jesteś ustawienie no_ack=True tak:

consumer_tag = channel.basic_consume(
    message_delivery_event, 
    no_ack=True, 
    queue=queue, 
) 

A następnie potwierdzając komunikaty:

channel.basic_ack(delivery_tag=args.delivery_tag) 

Masz do wyboru, jeśli chcesz, aby potwierdzić lub nie i ustawić poprawny parametr konsumpcji.

+0

Podstawową przyczyną mojego kodu jest problem z synchronizacją i konfiguracja. Mam prosty wrap, aby utworzyć konsumenta rabbitmq. Podczas pobierania kolejki tymczasowej (channel.queueDeclare ("", false, true, true, args) .getQueue()), nextDelivery muszą być chronione za pomocą synchronizacji w środowisku wielowątkowym. Oznacza to, że jeśli dostaniesz wiadomość, musisz potwierdzić ją, zanim zużyjesz inną wiadomość. W przeciwnym razie, kiedy wywołasz ack, rzuci wyjątek i pozostawi wyjątek rzutu, gdy zużyje ... – DeepNightTwo

+0

To był dokładnie problem, który miałem, dziękuję bardzo. – Rob

+1

Otrzymałem ten błąd po słabym scaleniu, w którym wiadomość została odebrana dwukrotnie z tym samym znacznikiem dostarczania. – blockloop

2

Wystąpił błąd związany z kodem. Udostępniasz kanał między wątkami. To nie jest obsługiwane przez pika (zobacz FAQ).Masz 2 opcje:

  1. zdefiniować flagę no_ack=True w basic_get(...) i nie korzystania z przedmiotu kanału w funkcji wątku doWork(...)
  2. Jeśli trzeba ACK wiadomość dopiero po zakończeniu pracy, pozwól głównym wątek (pętla while True:) obsługuje komunikat ack (a nie wątek roboczy). Poniżej znajduje się zmodyfikowana wersja twojego kodu, która to robi.

    from __future__ import with_statement 
    import pika 
    import sys 
    from pika.adapters.blocking_connection import BlockingConnection 
    from pika import connection, credentials 
    import time 
    import threading 
    import random 
    from pika.adapters.select_connection import SelectConnection 
    from pika.connection import Connection 
    import traceback 
    from Queue import Queue, Empty 
    
    def doWork(body, args, channel, ack_queue): 
        time.sleep(random.random()) 
        ack_queue.put(args.delivery_tag) 
    
    def doAck(channel): 
        while True: 
         try: 
          r = ack_queue.get_nowait() 
         except Empty: 
          r = None 
         if r is None: 
          break 
         try: 
          channel.basic_ack(delivery_tag=r) 
         except: 
          traceback.print_exc() 
    
    auth = credentials.PlainCredentials(username="guest", password="guest") 
    params = connection.ConnectionParameters(host="localhost", credentials=auth) 
    conn = BlockingConnection(params) 
    channel = conn.channel() 
    # Create a queue for the messages that should be ACKed by main thread 
    ack_queue = Queue() 
    
    while True: 
        time.sleep(0.03)  
        try: 
         doAck(channel) 
         method_frame, header_frame, body = channel.basic_get(queue="test_queue") 
         if method_frame.NAME == 'Basic.GetEmpty': 
          continue   
         t = threading.Thread(target=doWork, args=[body, method_frame, channel, ack_queue]) 
         t.setDaemon(True) 
         t.start() 
        except Exception, e: 
         traceback.print_exc() 
         continue 
    
4

Dla mnie to było właśnie to powiedziałem kolejkę ja nie jechałem ack, potem potwierdzony.

E.g. ŹLE:

channel.basic_consume(callback, queue=queue_name, no_ack=True) 

a następnie w moim callback:

def callback(ch, method, properties, body): 
    # do stuff 
    ch.basic_ack(delivery_tag = method.delivery_tag) 

PRAWO:

channel.basic_consume(callback, queue=queue_name, no_ack=False) 

Konkluzja: Jeśli chcesz ręcznie ACK, ustaw no_ack = Fałszywy.

Od docs:

no_ack: (Bool) jeśli ma wartość true, tryb automatycznego potwierdzenia zostaną wykorzystane (patrz http://www.rabbitmq.com/confirms.html)

+0

Dziękuję. To było naprawdę przydatne. Problemem jest to, że nazwa parametru (no_ack lub noAck w .net) jest nieco myląca. Czuję, że powinno to być nazywane "ack", a jeśli spełnisz warunki, to potwierdzi wiadomość. –

Powiązane problemy