Mam jednego wątku konsument ActiveMQ napisany w Javie. Wszystko, co próbuję zrobić, to odebrać() wiadomość z kolejki, spróbować wysłać ją do usługi sieciowej, a jeśli się powiedzie potwierdzić() to. Jeśli wywołanie usługi WWW nie powiedzie się, chcę, aby komunikat pozostał w kolejce i był ponownie wysyłany po pewnym czasie oczekiwania.Nie można uzyskać ActiveMQ do ponownego wysłania wiadomości
Działa mniej więcej, z wyjątkiem części do ponownego wysyłania: za każdym razem, gdy ponownie uruchamiam klienta, otrzymuje on jedną wiadomość dla każdego, kto wciąż jest w kolejce, ale po jej wysłaniu wiadomości nigdy nie są wysyłane ponownie.
Mój kod wygląda następująco:
public boolean init() throws JMSException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true); // ????
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
destination = session.createQueue(subject); //???
consumer = session.createConsumer(destination);
//consumer.setMessageListener(this); // message listener had same behaviour
}
private void process() {
while(true) {
System.out.println("Waiting...");
try {
Message message = consumer.receive();
onMessage(message);
} catch (JMSException e) {
e.printStackTrace();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void onMessage(Message message) {
System.out.println("onMessage");
messagesReceived++;
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
if(!client.sendMessage(msg)) {
System.out.println("Webservice call failed. Keeping message");
//message.
} else {
message.acknowledge();
}
if (transacted) {
if ((messagesReceived % batch) == 0) {
System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived);
session.commit();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Nie jestem obecnie za pomocą transakcji (może powinienem być?).
Jestem pewien, że brakuje mi czegoś łatwego i wkrótce uderzę się w moje czoło, ale nie mogę się zorientować, jak to ma działać. Dzięki!
EDIT: Nie mogę odpowiedzieć na to sobie nie tyle jako rep:
OK po pewnym więcej eksperymentów, okazuje się transakcje są jedynym sposobem, aby to zrobić. Oto nowy kod:
public boolean init() throws JMSException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(1000L);
policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true);
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ActiveMQSession.CLIENT_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
@Override
public void onMessage(Message message) {
System.out.println("onMessage");
messagesReceived++;
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
if(client.sendMessage(msg)) {
if(transacted) {
System.out.println("Call succeeded - committing message");
session.commit();
}
//message.acknowledge();
} else {
if(transacted) {
System.out.println("Webservice call failed. Rolling back message");
session.rollback();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Teraz wiadomość jest wysyłana ponownie co 1000ms zgodnie z zasadami ponownego dostarczania.
Mam nadzieję, że to pomoże komuś innemu! :)
naprawdę bardzo pomocny. Dziękuję Ci! – Amer
wow, który mnie uratował. Wygląda na to, że activemq zazwyczaj umieszcza rzeczy "z powrotem w kolejce" za około 10 sekund, chyba że podasz 1 tak, jak zrobiłeś. – rogerdpack
jest bardzo pomocny dla początkujących. Zwłaszcza dlaczego wspominasz '1000ms'. – Hanumath