2012-01-15 16 views
5

Ho un sistema che implementa Camel e ActiveMQ per la comunicazione tra alcuni server. Vorrei sapere se esiste un modo per scadere automaticamente e cancellare i messaggi inviati a una coda dopo X periodo di tempo. Dal momento che il server di origine (riempiendo la coda) non sa se qualcuno sta raccogliendo i messaggi, non voglio che la mia coda cresca fino a che non è così grande che qualcosa si blocca. Bonus karma indica a qualcuno che può aiutare e fornire il modo java dsl per implementare questa funzione.Scadenza automatica dei messaggi in Camel

Soluzione

// expire message after 2 minutes 
long ttl = System.currentTimeMillis() + 120000; 
// send our info one-way to the group topic 
camelTemplate.sendBodyAndHeader("jms:queue:stats", ExchangePattern.InOnly, stats, "JMSExpiration", ttl); 

risposta

1

Bene il setJMSExpiration (lunga scadenza):

è qualcosa che non DEVE chiama quando sei il cliente. Vedi il mio discorso su questo un po 'sul forum ActiveMQ.

http://apache-qpid-developers.2158895.n2.nabble.com/MRG-Java-JMS-Expiration-td7171854.html

+0

Nel tuo messaggio sul forum sembrava che volessi una scadenza di 10 minuti, ma invece hai impostato 10 secondi. ex. (10 * (60 * 1000)) vs (10 * 1000) –

+0

@ No meno, volevo 10 secondi. 10 minuti è un'impostazione predefinita di Qpid quando i messaggi scaduti vengono cancellati dalle code. Ad esempio: hai inviato un messaggio alla coda con scadenza impostata su 10 secondi, quindi dopo 10 secondi i messaggi sono scaduti da QPid e nessun consumatore può consumarlo, ma di fatto il messaggio è delete bu QPid quando il Thread dei criteri di rimozione (s) kick in, e l'impostazione di default è che danno il via dopo 10 minuti – Eugene

+0

Ok, ho frainteso ciò che avevi postato lì. –

3

presente, inoltre, che gli orologi tra client - broker hanno bisogno di essere in sincronia, per la scadenza per funzionare correttamente. Se gli orologi non sono sincronizzati, il set di scadenza dal client potrebbe essere già scaduto quando il messaggio viene ricevuto sul broker. Oppure il tempo del client è più avanti del broker, quindi la scadenza è più lunga di 10 secondi.

Mi picchia un po 'perché la scadenza è basata sul tempo del cliente. Quindi AMQ offre un plug-in, per risolvere il problema riallineando il tempo, per essere solo basato sul broker. Vedi http://activemq.apache.org/timestampplugin.html

1

Da parte nostra scegliamo di aggiungere il tempo di scadenza a destinazioni specifiche utilizzando un percorso cammello distribuito nel servizio ActiveMQ stesso.

L'unica cosa da fare è creare un file XML come il seguente con il nome e.g. setJMSExpiration.xml:

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation=" 
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd 
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> 

    <camelContext id="camel-set-expiration" xmlns="http://camel.apache.org/schema/spring"> 
    <!-- Copy route for each destination to expire --> 
    <route id="setJMSExpiration.my.queue.dlq"> 
     <from uri="broker:queue:MY.QUEUE.DLQ"/> 
     <setHeader headerName="JMSExpiration"> 
      <!-- Message will expire after 1 day --> 
      <spel>#{T(java.lang.System).currentTimeMillis() + 86400000}</spel> 
     </setHeader> 
     <to uri="broker:queue:MY.QUEUE.DLQ"/> 
    </route> 
    <route id="setJMSExpiration.another.queue"> 
     <from uri="broker:queue:ANOTHER.QUEUE"/> 
     <setHeader headerName="JMSExpiration"> 
      <!-- Message will expire after 5 days --> 
      <spel>#{T(java.lang.System).currentTimeMillis() + 432000000}</spel> 
     </setHeader> 
     <to uri="broker:queue:ANOTHER.QUEUE"/> 
    </route> 
    </camelContext> 
</beans> 

e importarlo nel vostro activemq.xml configurazione con:

<!-- Add default Expiration (file in the same directory) --> 
<import resource="setJMSExpiration.xml"/> 

In alternativa si può anche fornire specifica per destination policies se non si desidera che i messaggi scaduti per raggiungere la coda di ActiveMQ.DLQ.

<policyEntry queue="MY.QUEUE.DLQ"> 
    <deadLetterStrategy> 
     <sharedDeadLetterStrategy processExpired="false" /> 
    </deadLetterStrategy> 
</policyEntry> 
<policyEntry queue="ANOTHER.QUEUE"> 
    <deadLetterStrategy> 
     <sharedDeadLetterStrategy processExpired="false" /> 
    </deadLetterStrategy> 
</policyEntry> 

L'unica limitazione di questo modo è che non è possibile utilizzare facilmente i caratteri jolly come è codificato qui (è possibile, ma avrete bisogno di alcuni adattamenti utilizzando intestazione destinazione JMS nel percorso cammello).

Cerchiamo di consentire ai produttori di definire timeToLive (e forzarli il più possibile) ma non è sempre possibile costringerli a cambiare il loro codice, questo per minimizzare il numero di tali percorsi.