2015-03-23 15 views
5

RabbitMQ 3.5 teraz supports message priority; Nie mogę jednak zbudować działającego przykładu. Umieściłem mój kod poniżej. Obejmuje wynik, którego się spodziewam, i dane wyjściowe. Byłbym zainteresowany większą ilością dokumentacji i/lub przykładem roboczym.RabbitMQ 3.5 i Priorytet wiadomości

Moje pytanie w skrócie: Jak uzyskać priorytet wiadomości do pracy w Rabbit 3.5.0.0?

Wydawca:

using System; 
using RabbitMQ.Client; 
using System.Text; 
using System.Collections.Generic; 

class Publisher 
{ 

    public static void Main() 
    { 
     var factory = new ConnectionFactory() { HostName = "localhost" }; 
     using (var connection = factory.CreateConnection()) 
     { 
      using (var channel = connection.CreateModel()) 
      { 
       IDictionary <String , Object> args = new Dictionary<String,Object>() ; 
       args.Add(" x-max-priority ", 10); 
       channel.QueueDeclare("task_queue1", true, false, true, args); 

       for (int i = 1 ; i<=10; i++) 
       { 
        var message = "Message"; 
        var body = Encoding.UTF8.GetBytes(message + " " + i); 
        var properties = channel.CreateBasicProperties(); 
        properties.SetPersistent(true); 
        properties.Priority = Convert.ToByte(i); 
        channel.BasicPublish("", "task_queue1", properties, body); 
       } 
      } 
     } 
    } 
} 

konsumentów:

using System; 
using RabbitMQ.Client; 
using RabbitMQ.Client.Events; 
using System.Text; 
using System.Threading; 
using System.Collections.Generic; 

namespace Consumer 
{ 
    class Worker 
    { 
     public static void Main() 
     { 
      var factory = new ConnectionFactory() { HostName = "localhost" }; 
      using (var connection = factory.CreateConnection()) 
      { 
       using (var channel = connection.CreateModel()) 
       { 
        IDictionary<String, Object> args = new Dictionary<String, Object>();      
        channel.BasicQos(0, 1, false); 
        var consumer = new QueueingBasicConsumer(channel); 
        IDictionary<string, object> consumerArgs = new Dictionary<string, object>(); 
        channel.BasicConsume("task_queue1", false, "", args, consumer); 
        Console.WriteLine(" [*] Waiting for messages. " + 
             "To exit press CTRL+C"); 
        while (true) 
        { 
         var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
         var body = ea.Body; 
         var message = Encoding.UTF8.GetString(body); 
         Console.WriteLine(" [x] Received {0}", message); 
         channel.BasicAck(ea.DeliveryTag, false); 
        } 
       } 
      } 
     } 
    } 
} 

Rzeczywista moc:

[*] Waiting for messages. To exit press CTRL+C 
[x] Received Message 1 
[x] Received Message 2 
[x] Received Message 3 
[x] Received Message 4 
[x] Received Message 5 
[x] Received Message 6 
[x] Received Message 7 
[x] Received Message 8 
[x] Received Message 9 
[x] Received Message 10 

oczekiwany wynik:

[*] Waiting for messages. To exit press CTRL+C 
[x] Received Message 10 
[x] Received Message 9 
[x] Received Message 8 
[x] Received Message 7 
[x] Received Message 6 
[x] Received Message 5 
[x] Received Message 4 
[x] Received Message 3 
[x] Received Message 2 
[x] Received Message 1 

AKTUALIZACJA # 1. Znalazłem przykład w Javie here. Jednak jest to Rabbit 3.4.x.x. dodatek, który został włączony do 3.5. Jedyną różnicą, jaką widzę, jest to, że wyrażają priorytet jako int, a mój to bajt. Ale czuję, że to czerwony śledzia. Jestem tu trochę przegrany.

Odpowiedz

6

Cóż, rozwiązałem to. To był głupi błąd. pisałem:

args.Add(" x-max-priority ", 10); 

Powinno być

args.Add("x-max-priority", 10); 

będę to zostawić tak inni ludzie mogą mieć przykład roboczy RabbitMQ 3,5 za kolejek priorytetowych w C#.

2

Podobny RabbitMQ Priority Queue Wdrożenie w Węźle JS

Install amqplib

W celu przetestowania, jesteśmy zobowiązani do posiadania amqplib zainstalowany

npm install amqplib 

Publisher (wysyłanie. js)

#!/usr/bin/env node 

var amqp = require('amqplib/callback_api'); 

function bail(err, conn) { 
    console.error(err); 
    if (conn) conn.close(function() { process.exit(1); }); 
} 

function on_connect(err, conn) { 
    if (err !== null) return bail(err); 

    // name of queue 
    var q = 'hello'; 
    var msg = 'Hello World!'; 
    var priorityValue = 0; 

    function on_channel_open(err, ch) { 
    if (err !== null) return bail(err, conn); 
    // maxPriority : max priority value supported by queue 
    ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) { 
     if (err !== null) return bail(err, conn); 

     for(var index=1; index<=100; index++) { 
      priorityValue = Math.floor((Math.random() * 10)); 
      msg = 'Hello World!' + ' ' + index + ' ' + priorityValue; 
      ch.publish('', q, new Buffer(msg), {priority: priorityValue}); 
      console.log(" [x] Sent '%s'", msg); 
     } 

     ch.close(function() { conn.close(); }); 
    }); 
    } 

    conn.createChannel(on_channel_open); 
} 

amqp.connect(on_connect); 

abonenta (receive.js)

#!/usr/bin/env node 

var amqp = require('amqplib/callback_api'); 

function bail(err, conn) { 
    console.error(err); 
    if (conn) conn.close(function() { process.exit(1); }); 
} 

function on_connect(err, conn) { 
    if (err !== null) return bail(err); 
    process.once('SIGINT', function() { conn.close(); }); 

    var q = 'hello'; 

    function on_channel_open(err, ch) { 
    ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) { 
     if (err !== null) return bail(err, conn); 
     ch.consume(q, function(msg) { // message callback 
     console.log(" [x] Received '%s'", msg.content.toString()); 
     }, {noAck: true}, function(_consumeOk) { // consume callback 
     console.log(' [*] Waiting for messages. To exit press CTRL+C'); 
     }); 
    }); 
    } 

    conn.createChannel(on_channel_open); 
} 

amqp.connect(on_connect); 

Run:

node send.js 

Będzie utworzyć kolejkę o nazwie 'cześć' i będzie ją zalać '1000' wiadomości próbki stosując wymianę domyślny AMQP.

node receive.js 

Klient będzie subskrybować wiadomości oczekujące w kolejce.

0

Inna możliwość (dla przyszłych wyszukiwań)

„PUSH” metoda dostarczania wiadomości nie wydaje się szanować priorytet.

http://rabbitmq.docs.pivotal.io/35/rabbit-web-docs/dotnet-api-guide.html.html

poniżej cytat z powyższego URL. Odważyłem ważną część.

pobierania wiadomości w drodze subskrypcji („Push API”)

Innym sposobem, aby otrzymywać wiadomości jest utworzenie subskrypcji za pomocą interfejsu IBasicConsumer. Wiadomości będą dostarczane automatycznie po ich otrzymaniu, zamiast wymagać proaktywnego żądania. Jednym ze sposobów na wdrożenie konsumenta jest użycie EventingBasicConsumer klasy komfort, który wywoła dostawy i inne wydarzenia w cyklu życia konsumentów jak C# zdarzeń:

var consumer = new EventingBasicConsumer(channel); 
consumer.Received += (ch, ea) => 
       { 
        var body = ea.Body; 
        // ... process the message 
        ch.BasicAck(ea.DeliveryTag, false); 
       }; 
String consumerTag = channel.BasicConsume(queueName, false, consumer); 

Poprzez zmianę metody „pull”, Priorytet wydaje się być przestrzegane. Jednak w środki poniżej (z tej samej zawartości powyżej), wygląda na to jest kompromis (który mam pogrubione)

Pobieranie pojedynczych wiadomości („pull API”) Aby pobrać pojedyncze wiadomości, użyj IModel.BasicGet. Zwrócona wartość jest instancją BasicGetResult, z którego informacje nagłówka (właściwości) oraz treść wiadomości można wyodrębnić:

Od Noack = false powyżej, należy również zadzwonić IModel.BasicAck przyznać, że ciebie pomyślnie odbierane i przetwarzane komunikat:

... 
    // acknowledge receipt of the message 
    channel.BasicAck(result.DeliveryTag, false); 
} 

Należy pamiętać, że pobieranie wiadomości przy użyciu tego API jest stosunkowo nieefektywne. Jeśli wolisz, aby RabbitMQ przekazywał wiadomości do klienta, zapoznaj się z następną sekcją.

(Sekcja "następny" w tym przypadku przeniesie Cię do metody "push" na górze tego posta)

Powiązane problemy