2013-07-15 9 views
11

Mam utworzyć prostego wydawcy i konsumenta, który subskrybuje w kolejce przy użyciu basic.consume.Zużywające nie potwierdzam wiadomości od RabbitMq

Moi klienci potwierdzają wiadomości, gdy zadanie jest uruchamiane bez wyjątku. Ilekroć napotykam wyjątek, nie potwierdzam wiadomości i wracam wcześniej. Tylko potwierdzone wiadomości znikają z kolejki, więc to działa poprawnie.
Teraz chcę, aby konsument ponownie odebrał nieudane komunikaty, ale jedynym sposobem, aby je ponownie przeanalizować, jest ponowne uruchomienie klienta.

Jak mam podejść do tego przypadku użycia?

kod instalacyjny

$channel = new AMQPChannel($connection); 

$exchange = new AMQPExchange($channel); 

$exchange->setName('my-exchange'); 
$exchange->setType('fanout'); 
$exchange->declare(); 

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->declare(); 
$queue->bind('my-exchange'); 

kod Consumer

$queue->consume(array($this, 'callback')); 

public function callback(AMQPEnvelope $msg) 
{ 
    try { 
     //Do some business logic 
    } catch (Exception $ex) { 
     //Log exception 
     return; 
    } 
    return $queue->ack($msg->getDeliveryTag()); 
} 

kod Producent

$exchange->publish('message'); 
+0

W jakim języku się posługujesz i czy możesz podać kod? – pinepain

+0

@ zaq178miami, zobacz moją zredagowaną wiadomość –

+0

@Bram_Gerritsen, zobacz moją odpowiedź aktualizacja – pinepain

Odpowiedz

15

Jeśli wiadomość w ponieważ nie zostanie potwierdzone, a aplikacja nie powiedzie się, zostanie automatycznie ponownie dostarczona i właściwość na kopercie zostanie ustawiona na true (chyba, że ​​użytkownik ją zużyje z flagą no-ack = true).

UPD:

Trzeba nack wiadomości z flagą redystrybucji w bloku catch

try { 
     //Do some business logic 
    } catch (Exception $ex) { 
     //Log exception 
     return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); 
    } 

Strzeż nieskończenie Nacked wiadomości natomiast liczba ponownej dostawy nie zaimplementowane w RabbitMQ aw protokole AMQP w ogóle.

Jeśli nie chce zadzierać z takich wiadomości i po prostu chcą, aby dodać pewne opóźnienie może chcesz dodać trochę sleep() lub usleep() przed nack wywołanie metody, ale nie jest to dobry pomysł w ogóle.

Istnieje wiele technik radzenia sobie z problemem redeliver cyklu:

1. polegać na Dead Letter Exchanges

  • Plusy: solidność, STANDARD, jasnych
  • Wady: wymaga dodatkowej logiki

2.Użyj per message or per queue TTL

  • Zalety: łatwe do wdrożenia, a także standardowe, jasne
  • minusy: z długich kolejek można stracić jakąś wiadomość

Przykłady (uwaga, że ​​do kolejki TTL mijamy tylko numer i komunikatów TTL - cokolwiek to będzie ciąg numeryczny):

2,1 za wiadomość TTL:

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->declareQueue(); 
$queue->bind('my-exchange'); 

$exchange->publish(
    'message at ' . microtime(true), 
    null, 
    AMQP_NOPARAM, 
    array(
     'expiration' => '1000' 
    ) 
); 

2.2. Za kolejki TTL:

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->setArgument('x-message-ttl', 1000); 
$queue->declareQueue(); 
$queue->bind('my-exchange'); 

$exchange->publish('message at ' . microtime(true)); 

3. Trzymać redelivers liczyć lub lewej redelivers numerem (aka hop granicę lub TTL stosu IP) w treści wiadomości lub nagłówki

  • Plusy: daje dodatkową kontrolę w wiadomościach życia na poziomie aplikacji
  • Wady: znacząca napowietrznej gdy trzeba zmodyfikować wiadomość i opublikować go ponownie, aplikacja specyficzne, nie jest jasne

Kod:

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->declareQueue(); 
$queue->bind('my-exchange'); 

$exchange->publish(
    'message at ' . microtime(true), 
    null, 
    AMQP_NOPARAM, 
    array(
     'headers' => array(
      'ttl' => 100 
     ) 
    ) 
); 

$queue->consume(
    function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) { 
     $headers = $msg->getHeaders(); 
     echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' '; 
     echo $msg->getDeliveryTag(), ' '; 
     echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' '; 
     echo $msg->getBody(), PHP_EOL; 

     try { 
      //Do some business logic 
      throw new Exception('business logic failed'); 
     } catch (Exception $ex) { 
      //Log exception 
      if (isset($headers['ttl'])) { 
       // with ttl logic 

       if ($headers['ttl'] > 0) { 
        $headers['ttl']--; 

        $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers)); 
       } 

       return $queue->ack($msg->getDeliveryTag()); 
      } else { 
       // without ttl logic 
       return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue 
      } 

     } 

     return $queue->ack($msg->getDeliveryTag()); 
    } 
); 

Istnieje mogą być jakieś inne sposoby na lepszą komunikatu sterującego redelivers przepływ.

Wniosek: nie ma rozwiązania dla srebra. Musisz zdecydować, które rozwiązanie najlepiej pasuje do Twoich potrzeb lub dowiedzieć się czegoś innego, ale nie zapomnij go udostępnić;)

+0

Dziękuję za odpowiedź. 'redelivered' jest rzeczywiście ustawione na' true', ale muszę zrestartować mojego konsumenta blokującego, aby ponownie odczytać wiadomość. –

+0

Dzięki, to jest dokładnie to, czego potrzebowałem. Czy możesz podać mi wskazówki/sugestie, jak zapobiegać nieskończonemu dostarczaniu wiadomości? Byłoby miło, gdybym mógł opóźnić przekazywanie do kolejki o określoną ilość sekund, więc nie przeciążam mojego serwera. –

+0

gotowe, zaktualizowana odpowiedź ponownie – pinepain

0

Jeśli nie chcesz ponownie uruchamiać urządzenia, to komenda AMQP basic.recover może być tym, co Ty chcieć. Według AMQP protocol:

basic.recover(bit requeue) 

Redeliver unacknowledged messages. 

This method asks the server to redeliver all unacknowledged messages on a specified channel. 
Zero or more messages may be redelivered. This method replaces the asynchronous Recover. 
+0

Ta metoda nie jest częścią interfejsu API klienta, której używam. http://www.php.net/manual/en/book.amqp.php –

+1

RabbitMQ częściowo obsługuje tę metodę, patrz [oficjalny dokument na ten temat] (https://www.rabbitmq.com/specification.html# method-status-basic.recover) – pinepain

Powiązane problemy