2015-07-12 14 views
11

Zastanawiam się, dlaczego mój klient RabbitMQ RPC zawsze przetwarzał martwe wiadomości po ponownym uruchomieniu. _channel.QueueDeclare(queue, false, false, false, null); powinien wyłączyć bufory. Jeśli przeładowuję QueueDeclare wewnątrz klienta RPC, nie mogę połączyć się z serwerem. Czy coś tu jest nie tak? Każdy pomysł, jak rozwiązać ten problem?Trwała kolejka RabbitMQ nie działa (serwer RPC, klient RPC)


RPC-Server

new Thread(() => 
{ 
    var factory = new ConnectionFactory { HostName = _hostname }; 
    if (_port > 0) 
     factory.Port = _port; 
    _connection = factory.CreateConnection(); 
    _channel = _connection.CreateModel(); 

    _channel.QueueDeclare(queue, false, false, false, null); 
    _channel.BasicQos(0, 1, false); 
    var consumer = new QueueingBasicConsumer(_channel); 
    _channel.BasicConsume(queue, false, consumer); 
    IsRunning = true; 
    while (IsRunning) 
    { 
     BasicDeliverEventArgs ea; 
     try { 
      ea = consumer.Queue.Dequeue(); 
     } 
     catch (Exception ex) { 
      IsRunning = false; 
     } 
     var body = ea.Body; 
     var props = ea.BasicProperties; 
     var replyProps = _channel.CreateBasicProperties(); 
     replyProps.CorrelationId = props.CorrelationId; 

     var xmlRequest = Encoding.UTF8.GetString(body); 

     var messageRequest = XmlSerializer.DeserializeObject(xmlRequest, typeof(Message)) as Message; 
     var messageResponse = handler(messageRequest); 

     _channel.BasicPublish("", props.ReplyTo, replyProps, 
           messageResponse); 
     _channel.BasicAck(ea.DeliveryTag, false); 
    } 
}).Start(); 

RPC-Client

public void Start() 
{ 
    if (IsRunning) 
     return; 
    var factory = new ConnectionFactory { 
     HostName = _hostname, 
     Endpoint = _port <= 0 ? new AmqpTcpEndpoint(_endpoint) 
           : new AmqpTcpEndpoint(_endpoint, _port) 
    }; 
    _connection = factory.CreateConnection(); 
    _channel = _connection.CreateModel(); 
    _replyQueueName = _channel.QueueDeclare(); // Do not connect any more 
    _consumer = new QueueingBasicConsumer(_channel); 
    _channel.BasicConsume(_replyQueueName, true, _consumer); 
    IsRunning = true; 
} 

public Message Call(Message message) 
{ 
    if (!IsRunning) 
     throw new Exception("Connection is not open."); 
    var corrId = Guid.NewGuid().ToString().Replace("-", ""); 
    var props = _channel.CreateBasicProperties(); 
    props.ReplyTo = _replyQueueName; 
    props.CorrelationId = corrId; 

    if (!String.IsNullOrEmpty(_application)) 
     props.AppId = _application; 

    message.InitializeProperties(_hostname, _nodeId, _uniqueId, props); 

    var messageBytes = Encoding.UTF8.GetBytes(XmlSerializer.ConvertToString(message)); 
    _channel.BasicPublish("", _queue, props, messageBytes); 

    try 
    { 
     while (IsRunning) 
     { 
      var ea = _consumer.Queue.Dequeue(); 
      if (ea.BasicProperties.CorrelationId == corrId) 
      { 
       var xmlResponse = Encoding.UTF8.GetString(ea.Body); 
       try 
       { 
        return XmlSerializer.DeserializeObject(xmlResponse, typeof(Message)) as Message; 
       } 
       catch(Exception ex) 
       { 
        IsRunning = false; 
        return null; 
       } 
      } 
     } 
    } 
    catch (EndOfStreamException ex) 
    { 
     IsRunning = false; 
     return null; 
    } 
    return null; 
} 

Odpowiedz

6

Spróbuj ustawić właściwość DeliveryMode do (1) non-persistent w zdalnym wywołaniu procedury Kod klienta podobny do tego:

public Message Call(Message message) 
{ 
    ... 
    var props = _channel.CreateBasicProperties(); 
    props.DeliveryMode = 1; //you might want to do this in your RPC-Server as well 
    ... 
} 

AMQP Model Explained zawiera bardzo przydatne zasoby, takie jak wyjaśnienie, w jaki sposób obchodzić się z wiadomościami, które znajdą się w kolejce oczekujących wiadomości.

Innym przydatnym notatka z dokumentacją w odniesieniu do kolejki trwałość:

trwałe kolejki są zachowywane na dysku, a tym samym przetrwać restartów Broker. Kolejki, które nie są trwałe, nazywane są przejściowymi. Nie wszystkie scenariusze mogą być trwałe.

Trwałość kolejki nie powoduje, że wiadomości kierowane do tej kolejki są trwałe. Jeśli broker zostanie usunięty, a następnie zostanie przywrócony, trwająca kolejka zostanie ponownie zadeklarowana podczas uruchamiania brokera, jednak tylko trwałe wiadomości zostaną odzyskane.

Zauważ, że mówi o pośrednika restart nie wydawcy lub restartu konsumentów.

+0

Czy to pomogło @ MR.ABC? –