2012-03-26 22 views
6

W poniższym teście Próbuję symulacji następujący scenariusz:Jak zasymulować ponowną dostawę wiadomości w scenariuszu sesji JMS AUTO_ACKNOWLEDGE?

  1. Kolejka komunikatów jest uruchamiany.
  2. Konsument przeznaczony do awarii podczas przetwarzania wiadomości jest uruchomiony.
  3. Generowany jest komunikat.
  4. Klient rozpoczyna przetwarzanie wiadomości.
  5. Podczas przetwarzania zgłasza się wyjątek, aby zasymulować niepowodzenie przetwarzania komunikatów. Nieprzerwany konsument zostaje zatrzymany.
  6. Kolejny konsument jest uruchamiany z zamiarem odebrania ponownie dostarczonej wiadomości.

Ale mój test kończy się niepowodzeniem, a wiadomość nie trafia do nowego konsumenta. Doceniam wszelkie wskazówki na ten temat.

MessageProcessingFailureAndReprocessingTest.java

@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig", 
     loader=JavaConfigContextLoader.class) 
public class MessageProcessingFailureAndReprocessingTest extends AbstractJUnit4SpringContextTests { 
    @Autowired 
    private FailureReprocessTestScenario testScenario; 

    @Before 
    public void setUp() { 
     testScenario.start(); 
    } 

    @After 
    public void tearDown() throws Exception { 
     testScenario.stop(); 
    } 

    @Test public void 
    should_reprocess_task_after_processing_failure() { 
     try { 
      Thread.sleep(20*1000); 

      assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{ 
        "task-1", 
      }))); 
     } catch (InterruptedException e) { 
      fail(); 
     } 
    } 

    @Configurable 
    public static class FailureReprocessTestScenario { 
     @Autowired 
     public BrokerService broker; 

     @Autowired 
     public MockTaskProducer mockTaskProducer; 

     @Autowired 
     public FailingWorker failingWorker; 

     @Autowired 
     public SucceedingWorker succeedingWorker; 

     @Autowired 
     public TaskScheduler scheduler; 

     public void start() { 
      Date now = new Date(); 
      scheduler.schedule(new Runnable() { 
       public void run() { failingWorker.start(); } 
      }, now); 

      Date after1Seconds = new Date(now.getTime() + 1*1000); 
      scheduler.schedule(new Runnable() { 
       public void run() { mockTaskProducer.produceTask(); } 
      }, after1Seconds); 

      Date after2Seconds = new Date(now.getTime() + 2*1000); 
      scheduler.schedule(new Runnable() { 
       public void run() { 
        failingWorker.stop(); 
        succeedingWorker.start(); 
       } 
      }, after2Seconds); 
     } 

     public void stop() throws Exception { 
      succeedingWorker.stop(); 
      broker.stop(); 
     } 
    } 

    @Configuration 
    @ImportResource(value={"classpath:applicationContext-jms.xml", 
      "classpath:applicationContext-task.xml"}) 
    public static class ContextConfig { 
     @Autowired 
     private ConnectionFactory jmsFactory; 

     @Bean 
     public FailureReprocessTestScenario testScenario() { 
      return new FailureReprocessTestScenario(); 
     } 

     @Bean 
     public MockTaskProducer mockTaskProducer() { 
      return new MockTaskProducer(); 
     } 

     @Bean 
     public FailingWorker failingWorker() { 
      TaskListener listener = new TaskListener(); 
      FailingWorker worker = new FailingWorker(listenerContainer(listener)); 
      listener.setProcessor(worker); 
      return worker; 
     } 

     @Bean 
     public SucceedingWorker succeedingWorker() { 
      TaskListener listener = new TaskListener(); 
      SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener)); 
      listener.setProcessor(worker); 
      return worker; 
     } 

     private DefaultMessageListenerContainer listenerContainer(TaskListener listener) { 
      DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer(); 
      listenerContainer.setConnectionFactory(jmsFactory); 
      listenerContainer.setDestinationName("tasksQueue"); 
      listenerContainer.setMessageListener(listener); 
      listenerContainer.setAutoStartup(false); 
      listenerContainer.initialize(); 
      return listenerContainer; 
     } 

    } 

    public static class FailingWorker implements TaskProcessor { 
     private Logger LOG = Logger.getLogger(FailingWorker.class.getName()); 

     private final DefaultMessageListenerContainer listenerContainer; 

     public FailingWorker(DefaultMessageListenerContainer listenerContainer) { 
      this.listenerContainer = listenerContainer; 
     } 

     public void start() { 
      LOG.info("FailingWorker.start()"); 
      listenerContainer.start(); 
     } 

     public void stop() { 
      LOG.info("FailingWorker.stop()"); 
      listenerContainer.stop(); 
     } 

     @Override 
     public void processTask(Object task) { 
      LOG.info("FailingWorker.processTask(" + task + ")"); 
      try { 
       Thread.sleep(1*1000); 
       throw Throwables.propagate(new Exception("Simulate task processing failure")); 
      } catch (InterruptedException e) { 
       LOG.log(Level.SEVERE, "Unexpected interruption exception"); 
      } 
     } 
    } 

    public static class SucceedingWorker implements TaskProcessor { 
     private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName()); 

     private final DefaultMessageListenerContainer listenerContainer; 

     public final List<String> processedTasks; 

     public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) { 
      this.listenerContainer = listenerContainer; 
      this.processedTasks = new ArrayList<String>(); 
     } 

     public void start() { 
      LOG.info("SucceedingWorker.start()"); 
      listenerContainer.start(); 
     } 

     public void stop() { 
      LOG.info("SucceedingWorker.stop()"); 
      listenerContainer.stop(); 
     } 

     @Override 
     public void processTask(Object task) { 
      LOG.info("SucceedingWorker.processTask(" + task + ")"); 
      try { 
       TextMessage taskText = (TextMessage) task; 
       processedTasks.add(taskText.getText()); 
      } catch (JMSException e) { 
       LOG.log(Level.SEVERE, "Unexpected exception during task processing"); 
      } 
     } 
    } 

} 

TaskListener.java

public class TaskListener implements MessageListener { 

    private TaskProcessor processor; 

    @Override 
    public void onMessage(Message message) { 
     processor.processTask(message); 
    } 

    public void setProcessor(TaskProcessor processor) { 
     this.processor = processor; 
    } 

} 

MockTaskProducer.java

@Configurable 
public class MockTaskProducer implements ApplicationContextAware { 
    private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName()); 

    @Autowired 
    private JmsTemplate jmsTemplate; 

    private Destination destination; 

    private int taskCounter = 0; 

    public void produceTask() { 
     LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")"); 

     taskCounter++; 

     jmsTemplate.send(destination, new MessageCreator() { 
      @Override 
      public Message createMessage(Session session) throws JMSException { 
       TextMessage message = session.createTextMessage("task-" + taskCounter); 
       return message; 
      } 
     }); 
    } 

    @Override 
    public void setApplicationContext(ApplicationContext applicationContext) 
      throws BeansException { 
     destination = applicationContext.getBean("tasksQueue", Destination.class); 
    } 
} 
+1

Po ustawieniu 'listenerContainer.setSessionTransacted (true)' widzę, że wiadomość zostanie ponownie dostarczona, ale tylko do 'FailingWorker'. Zdarzenie po zatrzymaniu odpowiedniego kontenera odbiornika, 'SuccessLineWorker' nigdy nie otrzymuje ponownie dostarczonego komunikatu. –

+1

Pojawia się metoda 'listenerContainer.stop()' -metoda nie zamyka połączenia z dostarczeniami, w związku z czym dostawca JMS nadal próbuje ponownie dostarczyć komunikat o niepowodzeniu do tego samego konsumenta. Aby uniknąć tego, że upadający konsument powinien w pewnym momencie wywołać funkcję "listenerContainer.shutdown()". –

Odpowiedz

7

Wygląda na to, że źródło dokumentacji, którą szukałem wczoraj, to Creating Robust JMS Applications wprowadzić mnie w błąd (lub być może zrozumiałem to niepoprawnie). Zwłaszcza ten fragment:

Do czasu potwierdzenia komunikatu JMS, nie jest on uznawany za pomyślnie pobrany. Pomyślne zużycie wiadomości zwykle odbywa się w trzech etapach.

  1. Klient otrzymuje wiadomość.
  2. Klient przetwarza komunikat.
  3. Wiadomość zostanie potwierdzona. Potwierdzanie jest inicjowane przez dostawcę JMS lub przez klienta, w zależności od trybu potwierdzania sesji .

Przypuszczałem AUTO_ACKNOWLEDGE robi dokładnie to - potwierdził wiadomość po sposób słuchacz zwraca wynik. Ale zgodnie ze specyfikacją JMS jest nieco inny i zgodnie z oczekiwaniami pojemniki Spring nasłuchu nie powinny zmieniać zachowania ze specyfikacji JMS.To właśnie javadoc z AbstractMessageListenerContainer ma do powiedzenia - Mam podkreślił ważnych zdań:

Pojemnik słuchacz oferuje następujące stwierdzenie wiadomość opcje:

  • „sessionAcknowledgeMode” na " AUTO_ACKNOWLEDGE "(domyślnie): Automatyczne potwierdzenie wiadomości przed wykonaniem detektora; brak ponownego dostarczenia w przypadku wyjątku zgłoszonego.
  • "sessionAcknowledgeMode" ustawiony na "CLIENT_ACKNOWLEDGE": Automatyczne potwierdzenie komunikatu po pomyślnym wykonaniu detektora; nr ponowne dostarczenie w przypadku wyjątku zgłoszonego.
  • "sessionAcknowledgeMode" ustawiona na "DUPS_OK_ACKNOWLEDGE": Potwierdzenie leniwego komunikatu w trakcie lub po wykonaniu detektora; potencjalne ponowne dostarczenie w przypadku wyjątku zgłoszonego.
  • "sessionTransacted" ustawiona na "true": Potwierdzenie transakcyjne po pomyślnym wykonaniu detektora; gwarantowane ponowne dostarczenie w przypadku wyjątku zgłoszonego.

Tak więc kluczem do mojego rozwiązania jest listenerContainer.setSessionTransacted(true);

Innym problemem wychodził było to, że dostawca JMS utrzymuje redelivering uszkodzoną wiadomość z powrotem do tego samego konsumenta, że ​​zawiódł podczas przetwarzania wiadomości. Nie wiem, czy specyfikacja JMS podaje receptę, co powinien zrobić usługodawca w takich sytuacjach, ale dla mnie zadziałało użycie listenerContainer.shutdown(); w celu rozłączenia upadającego konsumenta i umożliwienia dostawcy ponownego dostarczenia wiadomości i udzielenia szansy. do innego konsumenta.

Powiązane problemy