2017-05-22 12 views
9

Rozwiązania w RabbitMQ Wait for a message with a timeout i Wait for a single RabbitMQ message with a timeout wydają się nie działać, ponieważ nie ma kolejnej metody dostarczania w oficjalnej bibliotece C#, a QueueingBasicConsumer jest zdyskredytowany, więc po prostu wyrzuca NotSupportedException wszędzie.C# RabbitMQ czekać na jedną wiadomość przez określony czas?

Jak mogę czekać na pojedynczą wiadomość z kolejki przez określony czas?

PS

Można to zrobić poprzez Basic.Get(), tak, ale dobrze, że jest złym rozwiązaniem, aby wyciągnąć wiadomości w przedziale specififed (nadmiar ruchu, nadmiar CPU).

Aktualizacja

EventingBasicConsumer przez implmenetation NIE obsługuje natychmiastowe anulowanie. Nawet jeśli zadzwonisz pod numer BasicCancel, nawet jeśli podasz wstępne pobranie przez BasicQos - nadal będzie on pobierany w Ramkach, a te ramki mogą zawierać wiele wiadomości. Nie nadaje się więc do wykonywania pojedynczego zadania. Nie przejmuj się - po prostu nie działa z pojedynczymi wiadomościami.

Odpowiedz

5

Istnieje wiele sposobów, aby to zrobić. Na przykład można użyć EventingBasicConsumer wraz z ManualResetEvent, tak (to tylko w celach demonstracyjnych - lepiej skorzystać z jednej z poniższych metod):

var factory = new ConnectionFactory(); 
using (var connection = factory.CreateConnection()) { 
    using (var channel = connection.CreateModel()) { 
     // setup signal 
     using (var signal = new ManualResetEvent(false)) { 
      var consumer = new EventingBasicConsumer(channel); 
      byte[] messageBody = null;       
      consumer.Received += (sender, args) => { 
       messageBody = args.Body; 
       // process your message or store for later 
       // set signal 
       signal.Set(); 
      };    
      // start consuming 
      channel.BasicConsume("your.queue", false, consumer); 
      // wait until message is received or timeout reached 
      bool timeout = !signal.WaitOne(TimeSpan.FromSeconds(10)); 
      // cancel subscription 
      channel.BasicCancel(consumer.ConsumerTag); 
      if (timeout) { 
       // timeout reached - do what you need in this case 
       throw new Exception("timeout"); 
      } 

      // at this point messageBody is received 
     } 
    } 
} 

Jak stwierdzono w komentarzach - jeśli można się spodziewać wielu wiadomości na temat tej samej kolejki , to nie jest najlepszy sposób. Cóż, nie jest to najlepszy sposób w każdym przypadku, zawarłem to tylko po to, aby zademonstrować użycie ManualResetEvent w przypadku, gdy sama biblioteka nie zapewnia obsługi limitów czasu.

Jeśli robisz wywołanie RPC (zdalne wywoływanie procedury, żądanie-odpowiedź) - możesz użyć SimpleRpcClient wraz z SimpleRpcServer po stronie serwera. po stronie klienta będzie wyglądać następująco:

var client = new SimpleRpcClient(channel, "your.queue"); 
client.TimeoutMilliseconds = 10 * 1000; 
client.TimedOut += (sender, args) => { 
    // do something on timeout 
};      
var reply = client.Call(myMessage); // will return reply or null if timeout reached 

Jeszcze bardziej prosty sposób: używać podstawowego Subscription klasę (używa tego samego EventingBasicConsumer wewnętrznie, ale wspiera limity czasu, dzięki czemu nie trzeba realizować siebie), tak:

var sub = new Subscription(channel, "your.queue"); 
BasicDeliverEventArgs reply; 
if (!sub.Next(10 * 1000, out reply)) { 
    // timeout 
} 
+0

Pierwsze rozwiązanie jest nieprawidłowe. BasicConsume nie ma gwarancji zatrzymania konsumpcji na BasicCancel, może to zrobić nieco później ze względu na implementację królika w locie (po prostu spróbuj użyć jednej wiadomości na żądanie, a zobaczysz, że w niektórych przypadkach przypiszesz kilka razy komunikat messageBody). Nadal będziesz wymagać zgłaszania nadmiernych wiadomości. Dla drugiego i trzeciego spróbuję teraz:) – eocron

+0

Choć twoja górna część nie ma znaczenia, klasa subskrypcji jest dokładnie tym, czego chciałem! Dziękuję, działa cudownie! Czy możesz edytować odpowiedź, aby inni wiedzieli, że ostatnia została opracowana? – eocron

+0

Mimo to przechowuje kilka wiadomości wewnątrz przez implementację, podczas gdy ja potrzebuję tylko jeden =/ – eocron

Powiązane problemy