2011-12-20 18 views
6

Mam jednego wątku konsument ActiveMQ napisany w Javie. Wszystko, co próbuję zrobić, to odebrać() wiadomość z kolejki, spróbować wysłać ją do usługi sieciowej, a jeśli się powiedzie potwierdzić() to. Jeśli wywołanie usługi WWW nie powiedzie się, chcę, aby komunikat pozostał w kolejce i był ponownie wysyłany po pewnym czasie oczekiwania.Nie można uzyskać ActiveMQ do ponownego wysłania wiadomości

Działa mniej więcej, z wyjątkiem części do ponownego wysyłania: za każdym razem, gdy ponownie uruchamiam klienta, otrzymuje on jedną wiadomość dla każdego, kto wciąż jest w kolejce, ale po jej wysłaniu wiadomości nigdy nie są wysyłane ponownie.

Mój kod wygląda następująco:

public boolean init() throws JMSException, FileNotFoundException, IOException { 
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); 
    RedeliveryPolicy policy = new RedeliveryPolicy(); 
    policy.setInitialRedeliveryDelay(500); 
    policy.setBackOffMultiplier(2); 
    policy.setUseExponentialBackOff(true); 

    connectionFactory.setRedeliveryPolicy(policy); 
    connectionFactory.setUseRetroactiveConsumer(true); // ???? 
    Connection connection = connectionFactory.createConnection(); 

    connection.setExceptionListener(this); 
    connection.start(); 

    session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); 
    destination = session.createQueue(subject); //??? 

    consumer = session.createConsumer(destination); 
    //consumer.setMessageListener(this); // message listener had same behaviour 

} 

private void process() { 
    while(true) { 
     System.out.println("Waiting..."); 
     try { 
      Message message = consumer.receive(); 
      onMessage(message); 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
     try { 
      Thread.sleep(500); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

@Override 
public void onMessage(Message message) { 
    System.out.println("onMessage"); 
    messagesReceived++; 

    if (message instanceof TextMessage) { 
     try { 
      TextMessage txtMsg = (TextMessage) message; 
      String msg = txtMsg.getText(); 

      if(!client.sendMessage(msg)) { 
       System.out.println("Webservice call failed. Keeping message"); 
       //message. 
      } else { 
       message.acknowledge(); 
      } 

      if (transacted) { 
       if ((messagesReceived % batch) == 0) { 
        System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived); 
        session.commit(); 
       } 
      } 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

Nie jestem obecnie za pomocą transakcji (może powinienem być?).

Jestem pewien, że brakuje mi czegoś łatwego i wkrótce uderzę się w moje czoło, ale nie mogę się zorientować, jak to ma działać. Dzięki!


EDIT: Nie mogę odpowiedzieć na to sobie nie tyle jako rep:

OK po pewnym więcej eksperymentów, okazuje się transakcje są jedynym sposobem, aby to zrobić. Oto nowy kod:

public boolean init() throws JMSException, FileNotFoundException, IOException { 
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); 
    RedeliveryPolicy policy = new RedeliveryPolicy(); 
    policy.setInitialRedeliveryDelay(1000L); 
    policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES); 

    connectionFactory.setRedeliveryPolicy(policy); 
    connectionFactory.setUseRetroactiveConsumer(true); 
    Connection connection = connectionFactory.createConnection(); 

    connection.setExceptionListener(this); 
    connection.start(); 

    session = connection.createSession(transacted, ActiveMQSession.CLIENT_ACKNOWLEDGE); 
    destination = session.createQueue(subject); 

    consumer = session.createConsumer(destination); 
} 

@Override 
public void onMessage(Message message) { 
    System.out.println("onMessage"); 
    messagesReceived++; 

    if (message instanceof TextMessage) { 
     try { 
      TextMessage txtMsg = (TextMessage) message; 
      String msg = txtMsg.getText(); 

      if(client.sendMessage(msg)) { 
       if(transacted) { 
        System.out.println("Call succeeded - committing message"); 
        session.commit(); 
       } 
       //message.acknowledge(); 
      } else { 
       if(transacted) { 
        System.out.println("Webservice call failed. Rolling back message"); 
        session.rollback(); 
       } 
      } 

     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

Teraz wiadomość jest wysyłana ponownie co 1000ms zgodnie z zasadami ponownego dostarczania.

Mam nadzieję, że to pomoże komuś innemu! :)

+0

naprawdę bardzo pomocny. Dziękuję Ci! – Amer

+0

wow, który mnie uratował. Wygląda na to, że activemq zazwyczaj umieszcza rzeczy "z powrotem w kolejce" za około 10 sekund, chyba że podasz 1 tak, jak zrobiłeś. – rogerdpack

+0

jest bardzo pomocny dla początkujących. Zwłaszcza dlaczego wspominasz '1000ms'. – Hanumath

Odpowiedz

3

Nie trzeba używać transakcji CLIENT_ACK/Session.recover() będzie działać jak dobrze ...

Wiadomości są redelivered do klienta w przypadku wystąpienia któregokolwiek z poniższych:

  • transakcji sesji używany i wycofywania() jest wywoływana.
  • Transakowana sesja jest zamykana przed wywołaniem zatwierdzenia.
  • Sesja używa CLIENT_ACKNOWLEDGE i wywoływana jest funkcja Session.recover().

zobaczyć http://activemq.apache.org/message-redelivery-and-dlq-handling.html

+0

Próbowałem 3 punkt, co zasugerowałeś w następujący sposób. 'Session session = connection.createSession (false, Session.CLIENT_ACKNOWLEDGE);. Po kodzie biznesowym chcę ponownie wysłać wiadomość do" Activemq ", wtedy tylko ja mogę przetworzyć wiadomość (która to procedura się nie powiodła). Do tego napisałem tak . 'session.recover();'. Ale po ocenie tego programu ActiveMq nie dostarcza wiadomości do subskrybenta.Aby uzyskać więcej informacji o tym, co próbuję, sprawdź [Link] (http://stackoverflow.com/questions/18712758/how-can-i-process-unacknowledged-messages-in-activemq) – Hanumath

Powiązane problemy