2008-09-29 12 views
9

Filtrowanie wiadomości JMS według JMSCorrelationID

Jak utworzyć instancję kolejkowania JMS w java (JRE/JDK/J2EE 1.4), która otrzymuje tylko komunikaty pasujące do podanego JMSCorrelationID? Wiadomości, które chcę odebrać, zostały opublikowane w kolejce, a nie w temacie, ale w razie potrzeby mogą się zmienić.

Oto kod, który obecnie używam umieścić komunikat w kolejce: konfiguracja połączenie

/** 
* publishResponseToQueue publishes Requests to the Queue. 
* 
* @param jmsQueueFactory    -Name of the queue-connection-factory 
* @param jmsQueue     -The queue name for the request 
* @param response      -A response object that needs to be published 
* 
* @throws ServiceLocatorException  -An exception if a request message 
*          could not be published to the Topic 
*/ 
private void publishResponseToQueue(String jmsQueueFactory, 
            String jmsQueue, 
            Response response) 
     throws ServiceLocatorException { 

    if (logger.isInfoEnabled()) { 
     logger.info("Begin publishRequestToQueue: " + 
         jmsQueueFactory + "," + jmsQueue + "," + response); 
    } 
    logger.assertLog(jmsQueue != null && !jmsQueue.equals(""), 
         "jmsQueue cannot be null"); 
    logger.assertLog(jmsQueueFactory != null && !jmsQueueFactory.equals(""), 
         "jmsQueueFactory cannot be null"); 
    logger.assertLog(response != null, "Request cannot be null"); 

    try { 

     Queue queue = (Queue)_context.lookup(jmsQueue); 

     QueueConnectionFactory factory = (QueueConnectionFactory) 
      _context.lookup(jmsQueueFactory); 

     QueueConnection connection = factory.createQueueConnection(); 
     connection.start(); 
     QueueSession session = connection.createQueueSession(false, 
            QueueSession.AUTO_ACKNOWLEDGE); 

     ObjectMessage objectMessage = session.createObjectMessage(); 

     objectMessage.setJMSCorrelationID(response.getID()); 

     objectMessage.setObject(response); 

     session.createSender(queue).send(objectMessage); 

     session.close(); 
     connection.close(); 

    } catch (Exception e) { 
     //XC3.2 Added/Modified BEGIN 
     logger.error("ServiceLocator.publishResponseToQueue - Could not publish the " + 
         "Response to the Queue - " + e.getMessage()); 
     throw new ServiceLocatorException("ServiceLocator.publishResponseToQueue " + 
              "- Could not publish the " + 
         "Response to the Queue - " + e.getMessage()); 
     //XC3.2 Added/Modified END 
    } 

    if (logger.isInfoEnabled()) { 
     logger.info("End publishResponseToQueue: " + 
         jmsQueueFactory + "," + jmsQueue + response); 
    } 

} // end of publishResponseToQueue method 

Odpowiedz

10

Kolejka jest taka sama, ale gdy masz QueueSession można ustawić selektor podczas tworzenia słuchawkę.

QueueReceiver receiver = session.createReceiver(myQueue, "JMSCorrelationID='theid'"); 

następnie

receiver.receive() 

lub

receiver.setListener(myListener); 
+0

Ostatnio czytałem na ten sam temat i mam pytanie w następujący sposób: czy odbiorca nadal otrzyma wiadomości, które nie zawierają wymaganego identyfikatora korelacji i po cichu upuszczą je bez przetwarzania, czy też Sam dostawca JMS nie dostarcza takich komunikatów do odbiorcy, aby nadal pozostawać w kolejce? Uważam, że to drugie podejście jest poprawne, ale chcę je zweryfikować. Dzięki. – shrini1000

+0

@ shrini1000 masz rację. – Trying

5

BTW a jej nie rzeczywista pytanie pytasz - jeśli starają się realizować żądania odpowiedzi nad JMS Polecam reading this article jako Interfejs API JMS jest nieco bardziej skomplikowany, niż można sobie wyobrazić, a wykonanie tego w wydajny sposób jest o wiele trudniejsze, niż się wydaje.

W szczególności to use JMS efficiently należy starać się unikać tworzenia konsumentów dla pojedynczej wiadomości itp

także dlatego, że API JMS jest tak bardzo skomplikowane w użyciu prawidłowo i skutecznie - szczególnie z poolingu, transakcji i jednoczesnego przetwarzania - Polecam ludzie hide the middleware from their application code takie jak poprzez używając Apache Camel's Spring Remoting implementation for JMS

+0

Zrobiłabym sobie dużo przeróbki kół, gdybym wiedział o Camelu kilka lat temu. –

0
String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'"; 
QueueReceiver receiver = session.createReceiver(queue, filter); 

Tutaj odbiornik dostanie wiadomości dla których JMSCorrelationID jest równa MessageID. jest to bardzo pomocne w paradygmacie żądanie/odpowiedź.

lub można bezpośrednio ustawić na dowolną wartość:

QueueReceiver receiver = session.createReceiver(queue, "JMSCorrelationID ='"+id+"'";); 

niż można zrobić albo receiver.receive(2000); lub receiver.setMessageListener(this);

2

nadzieję, że to pomoże. Użyłem Open MQ.

package com.MQueues; 

import java.util.UUID; 

import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.jms.QueueConnection; 
import javax.jms.QueueReceiver; 
import javax.jms.QueueSession; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import com.sun.messaging.BasicQueue; 
import com.sun.messaging.QueueConnectionFactory; 

public class HelloProducerConsumer { 

public static String queueName = "queue0"; 
public static String correlationId; 

public static String getCorrelationId() { 
    return correlationId; 
} 

public static void setCorrelationId(String correlationId) { 
    HelloProducerConsumer.correlationId = correlationId; 
} 

public static String getQueueName() { 
    return queueName; 
} 

public static void sendMessage(String threadName) { 
    correlationId = UUID.randomUUID().toString(); 
    try { 

     // Start connection 
     QueueConnectionFactory cf = new QueueConnectionFactory(); 
     QueueConnection connection = cf.createQueueConnection(); 
     QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 
     BasicQueue destination = (BasicQueue) session.createQueue(threadName); 
     MessageProducer producer = session.createProducer(destination); 
     connection.start(); 

     // create message to send 
     TextMessage message = session.createTextMessage(); 
     message.setJMSCorrelationID(correlationId); 
     message.setText(threadName + "(" + System.currentTimeMillis() 
       + ") " + correlationId +" from Producer"); 

     System.out.println(correlationId +" Send from Producer"); 
     producer.send(message); 

     // close everything 
     producer.close(); 
     session.close(); 
     connection.close(); 

    } catch (JMSException ex) { 
     System.out.println("Error = " + ex.getMessage()); 
    } 
} 

public static void receivemessage(final String correlationId) { 
    try { 

     QueueConnectionFactory cf = new QueueConnectionFactory(); 
     QueueConnection connection = cf.createQueueConnection(); 
     QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 

     BasicQueue destination = (BasicQueue) session.createQueue(getQueueName()); 

     connection.start(); 

     System.out.println("\n"); 
     System.out.println("Start listen " + getQueueName() + " " + correlationId +" Queue from receivemessage"); 
     long now = System.currentTimeMillis(); 

     // receive our message 
     String filter = "JMSCorrelationID = '" + correlationId + "'"; 
     QueueReceiver receiver = session.createReceiver(destination, filter); 
     TextMessage m = (TextMessage) receiver.receive(); 
     System.out.println("Received message = " + m.getText() + " timestamp=" + m.getJMSTimestamp()); 

     System.out.println("End listen " + getQueueName() + " " + correlationId +" Queue from receivemessage"); 

     session.close(); 
     connection.close(); 

    } catch (JMSException ex) { 
     System.out.println("Error = " + ex.getMessage()); 
    } 
} 

public static void main(String args[]) { 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId1 = getCorrelationId(); 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId2 = getCorrelationId(); 
    HelloProducerConsumer.sendMessage(getQueueName()); 
    String correlationId3 = getCorrelationId(); 


    HelloProducerConsumer.receivemessage(correlationId2); 

    HelloProducerConsumer.receivemessage(correlationId1); 

    HelloProducerConsumer.receivemessage(correlationId3); 
} 
} 
Powiązane problemy