2011-08-27 15 views
12

Pracuję z JMS i ActiveMQ. Wszystko działa cuda. Nie używam sprężyny ani nie.Sygnalizacja wycofania z JMS MessageListener

Interfejs javax.jms.MessageListener ma tylko jedną metodę, onMessage. Wewnątrz implementacji istnieje szansa, że ​​zostanie zgłoszony wyjątek. Jeśli w rzeczywistości zostanie zgłoszony wyjątek, to znaczy, że wiadomość nie została poprawnie przetworzona i musi zostać ponownie sprawdzona. Potrzebuję ActiveMQ, aby poczekać chwilę, a następnie spróbować ponownie. tzn. potrzebuję wyjątku, aby wycofać transakcję JMS.

Jak mogę osiągnąć takie zachowanie?

Być może jest jakaś konfiguracja w ActiveMQ, której nie mogłem znaleźć.

Albo ... może mogłaby zlikwidować rejestracji MessageListener s konsumentom i zużywają komunikatów siebie, w pętli jak:

while (true) { 
    // ... some administrative stuff like ... 
    session = connection.createSesstion(true, SESSION_TRANSACTED) 
    try { 
     Message m = receiver.receive(queue, 1000L); 
     theMessageListener.onMessage(m); 
     session.commit(); 
    } catch (Exception e) { 
     session.rollback(); 
     Thread.sleep(someTimeDefinedSomewhereElse); 
    } 
    // ... some more administrative stuff 
} 

w kilku wątków, zamiast rejestracji słuchacza.

Lub ... Mogłem jakoś udekorować/AOP/bajt-manipulować MessageListener s, aby zrobić to samemu.

Jaką drogę byś wybrał i dlaczego?

uwaga: Nie mam pełnej kontroli nad kodem MessageListener.

EDIT Test na dowód koncepcji:

@Test 
@Ignore("Interactive test, just a proof of concept") 
public void transaccionConListener() throws Exception { 
    final AtomicInteger atomicInteger = new AtomicInteger(0); 

    BrokerService brokerService = new BrokerService(); 

    String bindAddress = "vm://localhost"; 
    brokerService.addConnector(bindAddress); 
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); 
    brokerService.setUseJmx(false); 
    brokerService.start(); 

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(500); 
    redeliveryPolicy.setBackOffMultiplier(2); 
    redeliveryPolicy.setUseExponentialBackOff(true); 
    redeliveryPolicy.setMaximumRedeliveries(2); 

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQConnectionFactory.setUseRetroactiveConsumer(true); 
    activeMQConnectionFactory.setClientIDPrefix("ID"); 
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); 

    pooledConnectionFactory.start(); 

    Connection connection = pooledConnectionFactory.createConnection(); 
    Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); 
    Queue helloQueue = session.createQueue("Hello"); 
    MessageConsumer consumer = session.createConsumer(helloQueue); 
    consumer.setMessageListener(new MessageListener() { 

     @Override 
     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       switch (atomicInteger.getAndIncrement()) { 
        case 0: 
         System.out.println("OK, first message received " + textMessage.getText()); 
         message.acknowledge(); 
         break; 
        case 1: 
         System.out.println("NOPE, second must be retried " + textMessage.getText()); 
         throw new RuntimeException("I failed, aaaaah"); 
        case 2: 
         System.out.println("OK, second message received " + textMessage.getText()); 
         message.acknowledge(); 
       } 
      } catch (JMSException e) { 
       e.printStackTrace(System.out); 
      } 
     } 
    }); 
    connection.start(); 

    { 
     // A client sends two messages... 
     Connection connection1 = pooledConnectionFactory.createConnection(); 
     Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     connection1.start(); 

     MessageProducer producer = session1.createProducer(helloQueue); 
     producer.send(session1.createTextMessage("Hello World 1")); 
     producer.send(session1.createTextMessage("Hello World 2")); 

     producer.close(); 
     session1.close(); 
     connection1.stop(); 
     connection1.close(); 
    } 
    JOptionPane.showInputDialog("I will wait, you watch the log..."); 

    consumer.close(); 
    session.close(); 
    connection.stop(); 
    connection.close(); 
    pooledConnectionFactory.stop(); 

    brokerService.stop(); 

    assertEquals(3, atomicInteger.get()); 
} 
+0

Bardzo dziękuję Whaley i @Ammar za odpowiedzi. Obstawiam oba, odkąd obaj wskazaliście mi właściwy tor. Ale nie wybieram jeszcze właściwej odpowiedzi. Ponieważ potrzeba więcej testów. –

Odpowiedz

10

Jeśli chcesz użyć SESSION_TRANSACTED jako tryb potwierdzenia, to trzeba skonfigurować RedeliveryPolicy on your Connection/ConnectionFactory. This page on ActiveMQ's website zawiera również dobre informacje o tym, co być może trzeba zrobić.

Ponieważ nie używasz Spring, można skonfigurować do RedeliveryPolicy z czymś podobnym do następującego kodu (zaczerpnięte z jednego z linków powyżej):

RedeliveryPolicy policy = connection.getRedeliveryPolicy(); 
policy.setInitialRedeliveryDelay(500); 
policy.setBackOffMultiplier(2); 
policy.setUseExponentialBackOff(true); 
policy.setMaximumRedeliveries(2); 

Edytuj odrywania fragment kodu dodany do odpowiedzi, poniżej pokazano, jak to działa z transakcjami. Wypróbuj ten kod za pomocą metody Session.rollback() skomentowanej, a zobaczysz, że używasz SESION_TRANSACTED i Session.commit/rollback działa zgodnie z oczekiwaniami:

@Test 
public void test() throws Exception { 
    final AtomicInteger atomicInteger = new AtomicInteger(0); 

    BrokerService brokerService = new BrokerService(); 

    String bindAddress = "vm://localhost"; 
    brokerService.addConnector(bindAddress); 
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); 
    brokerService.setUseJmx(false); 
    brokerService.start(); 

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(500); 
    redeliveryPolicy.setBackOffMultiplier(2); 
    redeliveryPolicy.setUseExponentialBackOff(true); 
    redeliveryPolicy.setMaximumRedeliveries(2); 

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQConnectionFactory.setUseRetroactiveConsumer(true); 
    activeMQConnectionFactory.setClientIDPrefix("ID"); 

    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); 

    pooledConnectionFactory.start(); 

    Connection connection = pooledConnectionFactory.createConnection(); 
    final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); 
    Queue helloQueue = session.createQueue("Hello"); 
    MessageConsumer consumer = session.createConsumer(helloQueue); 
    consumer.setMessageListener(new MessageListener() { 

     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       switch (atomicInteger.getAndIncrement()) { 
        case 0: 
         System.out.println("OK, first message received " + textMessage.getText()); 
         session.commit(); 
         break; 
        case 1: 
         System.out.println("NOPE, second must be retried " + textMessage.getText()); 
         session.rollback(); 
         throw new RuntimeException("I failed, aaaaah"); 
        case 2: 
         System.out.println("OK, second message received " + textMessage.getText()); 
         session.commit(); 
       } 
      } catch (JMSException e) { 
       e.printStackTrace(System.out); 
      } 
     } 
    }); 
    connection.start(); 

    { 
     // A client sends two messages... 
     Connection connection1 = pooledConnectionFactory.createConnection(); 
     Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     connection1.start(); 

     MessageProducer producer = session1.createProducer(helloQueue); 
     producer.send(session1.createTextMessage("Hello World 1")); 
     producer.send(session1.createTextMessage("Hello World 2")); 

     producer.close(); 
     session1.close(); 
     connection1.stop(); 
     connection1.close(); 
    } 
    JOptionPane.showInputDialog("I will wait, you watch the log..."); 

    consumer.close(); 
    session.close(); 
    connection.stop(); 
    connection.close(); 
    pooledConnectionFactory.stop(); 

    assertEquals(3, atomicInteger.get()); 
} 

}

+0

To nie zadziałało. Ale wskazał mi właściwy kierunek. Opuszczę DUPS_OK_ACKNOWLEDGE, ponieważ wydaje mi się, że to ten, który działa, muszę jak najmniej pracować. –

+0

Musisz wkleić cały kod, ponieważ nie robisz czegoś poprawnie z Sesją. Wydaje się, że DUPS_OK_ACKNOWLEDGE działa, ponieważ potwierdzenie klienta jest leniwy, a broker będzie po prostu kontynuował wysyłanie wiadomości, dopóki klient ostatecznie nie potwierdzi. – whaley

+0

I wkleiłem dowód koncepcji. Mogę tylko sprawić, żeby działało z DUPS_OK_ACKNOWLEDGE, a message.acknowledgement nie robi różnicy. –

2

Musisz ustawić tryb potwierdzenia do Session.CLIENT_ACKNOWLEDGE, klient uznaje się zużywanej wiadomość poprzez wywołanie metody potwierdzić Komunikat jest.

QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

Następnie, po przetworzeniu komunikatu do zadzwonić do sposobu Message.acknowledge (i), w celu usunięcia tej wiadomości.

Message message = ...; 
// Processing message 

message.acknowledge(); 
+0

To nie działa. _onMessage_ wciąż jest wywoływany raz, nawet jeśli _message.acknowledge() _ nigdy nie zostaje wywołany. –

+0

Czy poprawnie ustawiłeś tryb potwierdzania? Musi być ustawiony na Session.CLIENT_ACKNOWLEDGE! – Ammar

+0

Ale działa z (false, Session.DUPS_OK_ACKNOWLEDGE) ... message.acknowledge() nie wydaje się załatwiać. –

0

Jeśli sesja jest przedmiotem transakcji, a następnie "acknowledgeMode" jest ignorowany anyways..So, zostaw sesja transakcji i używać session.rollback i session.commit zatwierdzić lub wycofać transakcję.

+1

Myślę (moim) problemem jest to, że sesja nie jest dostępna w ramach MessageListener.onMessage (Message). –