2014-04-24 6 views
5

Abbiamo il codice Clojure che legge da una coda di coniglio. Vorremmo tollerare il caso in cui il server RabbitMQ è inattivo brevemente, ad es. in caso di riavvio (sudo service rabbitmq-server restart).Come tollerare il riavvio di RabbitMQ in Langohr?

Sembra essere some provision for reconnecting in Langohr. Abbiamo adattato l'esempio clojurewerkz.langohr.examples.recovery.example1 (Gist here). Lievi differenze rispetto all'esempio pubblicato includono i parametri di connessione e la rimozione della chiamata lb/publish (dato che stiamo riempiendo i dati con una fonte esterna).

Possiamo consumare correttamente i dati dalla coda e attendere altri messaggi. Tuttavia, quando si riparte RMQ (tramite il sopra sudo comando sul VM di hosting RabbitMQ), la seguente eccezione viene generata:

Caught an exception during connection recovery! 
java.io.IOException 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) 
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:378) 
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516) 
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:545) 
    at com.novemberain.langohr.Connection.recoverConnection(Connection.java:166) 
    at com.novemberain.langohr.Connection.beginAutomaticRecovery(Connection.java:115) 
    at com.novemberain.langohr.Connection.access$000(Connection.java:18) 
    at com.novemberain.langohr.Connection$1.shutdownCompleted(Connection.java:93) 
    at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75) 
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:573) 
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException 
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) 
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) 
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) 
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:321) 
    ... 8 more 
Caused by: java.io.EOFException 
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:273) 
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) 
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131) 
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533) 

Sembra probabile che il meccanismo di riavvio previsto fornito da Langohr è rottura quando entra in gioco. Esiste un modello alternativo che è preferibile nel caso di questi riavvii "duri"? In alternativa, suppongo che dobbiamo implementare il monitoraggio della connessione e riprovare noi stessi. Qualsiasi suggerimento sarebbe il benvenuto.

risposta

2

Abbiamo visto queste tracce dello stack, ma non le vediamo più con Langohr 2.9.0. Dopo un riavvio, i nostri client clojure si riconnettono e i messaggi ricominciano a scorrere.

Stiamo utilizzando le impostazioni predefinite, che hanno una copertura di connessione e la topologia acceso, come dimostra questo:

(infof "Automatic recovery enabled? %s" (rmq/automatic-recovery-enabled? connection)) 
(infof "Topology recovery enabled? %s" (rmq/automatic-topology-recovery-enabled? connection))