Mamy kod Clojure, który czyta się z kolejki Królika. Chcielibyśmy tolerować przypadek, w którym serwer RabbitMQ jest na krótko wyłączony, np. w przypadku ponownego uruchomienia (sudo service rabbitmq-server restart
).Jak tolerować ponowne uruchamianie RabbitMQ w Langohr?
Wygląda na to, że w Langohr jest some provision for reconnecting. Dostosowaliśmy przykład clojurewerkz.langohr.examples.recovery.example1
(Gist here). Niewielkie różnice w stosunku do opublikowanego przykładu obejmują parametry połączenia i usunięcie wywołania lb/publish
(ponieważ uzupełniamy dane zewnętrznym źródłem).
Możemy z powodzeniem pobierać dane z kolejki i czekać na kolejne wiadomości. Jednak, kiedy ponownie uruchomić RMQ (przez wyżej sudo
polecenia na VM hostingu RabbitMQ), następujące jest wyjątek:
Caught an exception during connection recovery!
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:378)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:545)
at com.novemberain.langohr.Connection.recoverConnection(Connection.java:166)
at com.novemberain.langohr.Connection.beginAutomaticRecovery(Connection.java:115)
at com.novemberain.langohr.Connection.access$000(Connection.java:18)
at com.novemberain.langohr.Connection$1.shutdownCompleted(Connection.java:93)
at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:573)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:321)
... 8 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:273)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)
Wydaje się prawdopodobne, że zamierzony mechanizm restartu dostarczone przez Langohr łamie gdy w rzutach. Czy istnieje alternatywny wzór, który jest preferowany w przypadku tych "twardych" ponownych uruchomień? Alternatywnie, przypuszczam, że musimy zaimplementować monitorowanie połączeń i próbować siebie. Wszelkie sugestie będą mile widziane.