2011-03-08 16 views
8

Abbiamo una coda JMS che riceve un numero molto elevato di messaggi.Elaborazione JMS efficace

Il listener deve salvare il messaggio nel database utilizzando una transazione di database e quindi eseguire il commit della transazione JMS.

Quindi, come posso farlo in modo più efficace dove non devo fare il database & JMS commit su ogni messaggio.

risposta

6

La premessa alla base della messaggistica asincrona, specialmente quando si usa un MDB, è che ogni messaggio è atomico. Vale a dire, il risultato dell'elaborazione di qualsiasi messaggio dovrebbe essere indipendente dal risultato dell'elaborazione di qualsiasi altro messaggio. La soluzione ideale al tuo problema preserverà questa atomicità dei messaggi.

Se si dovessero elaborare più messaggi nella stessa unità di lavoro, si perderebbe questa atomicità. Ad esempio, supponi di aver deciso di sincronizzare ogni 25 messaggi. Se il 25 ° messaggio ha avuto un errore, ad esempio un problema di conversione della codepage che ne impediva il recupero dalla coda, l'intero batch di messaggi sarebbe stato annullato. Sarebbero quindi stati riconsegnati. Il conteggio di riconsegna dei messaggi aumenterebbe con ogni ciclo di lettura/backout. Una volta che il conteggio di riconsegna ha superato la soglia impostata nel server dell'app, tutti i 25 messaggi verranno scartati o riacquistati, a seconda della configurazione. Più grande è il batch, più messaggi sono potenzialmente interessati in una situazione di errore perché l'intero batch vive o muore insieme. Imposta la dimensione del tuo batch su 100 e 100 messaggi saranno a rischio in caso di un singolo messaggio veleno.

Una soluzione alternativa consiste nel consentire numerosi thread di elaborazione nel proprio MDB. Con JMS puoi generare molte sessioni sotto la stessa connessione. Ogni sessione può gestire la propria unità di lavoro, pertanto ciascuna sessione può avviare autonomamente una transazione XA, ottenere un messaggio, aggiornare il database e quindi eseguire il commit della transazione. Se un messaggio non funziona, solo il messaggio e l'aggiornamento del database sono interessati.

Ci sono delle eccezioni. Ad esempio, se si elabora un batch di grandi dimensioni e tutti i messaggi provengono dallo stesso produttore, è comune utilizzare qualcosa di diverso da un MDB per recuperare molti messaggi e aggiornare molte righe con la stessa unità di lavoro. Allo stesso modo, se i messaggi dipendono dalla sequenza, l'elaborazione parallela non è possibile perché non preserverebbe la sequenza. Ma poi di nuovo, i messaggi dipendenti dalla sequenza non sono atomici. Ancora una volta, in questo caso un MDB non è la soluzione ideale.

A seconda del provider di servizi di trasporto, il numero di thread supportati può essere limitato solo dall'archiviazione della memoria. Ad esempio, WebSphere MQ può gestire facilmente centinaia di thread getter simultanei su una coda. Controlla l'ottimizzazione della configurazione MDB del tuo server delle app per vedere quanti thread puoi far ruotare e quindi verificare che il tuo trasporto possa gestire il carico. Quindi giocare un po 'per trovare il numero ottimale di thread. Le prestazioni aumenteranno notevolmente man mano che i thread aumentano da uno, ma solo fino a un certo punto. Passato quel punto si vede generalmente un plateau, quindi un calo in quanto la gestione dei thread overhead compensa i guadagni in termini di prestazioni. Dove si trova lo spot swe3et dipende da quanto pesantemente viene caricato il broker di messaggistica e se è più vincolato da CPU, memoria, disco o rete.

8

Non farlo su ogni messaggio, farlo in lotti. JMS supporta le transazioni proprio come fa il tuo DB; avviare una transazione JMS, leggere N messaggi. Avvia transazione DB, inserisci N messaggi. Commit to JMS, commit to DB.

Questo ovviamente introduce una finestra in cui si verifica una corsa (si verifica un arresto anomalo tra i due commit). Lo hai ora, ma solo per un singolo messaggio. Se vuoi risolvere questo problema, devi considerare le transazioni XA (commit a due fasi) o almeno una sorta di schema di rilevamento duplicato. Per alcune intro, date un'occhiata a: http://activemq.apache.org/should-i-use-xa.html

+0

Il metodo di richiamata "onMessage" restituisce solo un messaggio alla volta, quindi come posso recuperare N messaggi. – changed

+3

Non vorrei usare l'interfaccia MessageListener e fare solo cose quando ho ricevuto un messaggio. Potresti farlo (tieni traccia di quanti messaggi hai ricevuto tramite una variabile membro, avvia e commetti transazioni, ecc.) Ma stai estendendo la finestra della condizione di gara perché stai facendo affidamento su un messaggio per attivare un'azione. Non è davvero l'approccio migliore. È molto meglio fare un ciclo tradizionale in cui si leggono i messaggi fuori dalla coda (blocco della chiamata con timeout o non-block) e si commette quando si hanno N messaggi o Y volta. –

+1

Siamo spiacenti, in particolare stiamo utilizzando i metodi MessageConsumer Interface ricevuti (timeout) e receiveNoWait() anziché registrare MessageListener con setMessageListener(). –

0

Ecco un processore jms che preleverà i messaggi da una coda, li aggiungerà a un elenco e passerà a un'altra coda. È possibile regolare come i valori vengono letti e aggregati nei rispettivi metodi:

public class JmsBatcher<T> { 
    final Session session; 
    private final MessageConsumer consumer; 
    private final MessageProducer producer; 
    private final int batchSize; 


    public JmsBatcher(final Connection connection, 
         final String sourceQueue, 
         final String destQueue, 
         final int batchSize) throws JMSException { 
     this.batchSize = batchSize; 
     session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 
     final Queue source = session.createQueue(sourceQueue); 
     final Queue dest = session.createQueue(destQueue); 
     consumer = session.createConsumer(source); 
     producer = session.createProducer(dest); 
    } 

    public void processBatch() { 
     final List<T> values = new ArrayList<>(); 
     try { 
      while (values.size() < batchSize) { 
       final Message message = consumer.receive(); 
       values.add(readObject(message)); 
       message.acknowledge(); 
      } 
      producer.send(createAggregate(values)); 
      session.commit(); 
     } catch (Exception e) { 
      // Log the exception 
      try { 
       session.rollback(); 
      } catch (JMSException re) { 
       // Rollback failed, so something fataly wrong. 
       throw new IllegalStateException(re); 
      } 
     } 
    } 

    private Message createAggregate(final List<T> values) throws JMSException { 
     return session.createObjectMessage((Serializable) values); 
    } 

    private T readObject(final Message message) throws JMSException { 
     return (T) ((ObjectMessage) message).getObject(); 
    } 
} 

Questo può essere avviato in un thread separato, e solo correre per sempre:

final JmsBatcher jmsBatcher = 
    new JmsBatcher(connection, "single", "batch", 25); 
new Thread(() -> { 
    while (true) { 
     jmsBatcher.processBatch(); 
    } 
}).start(); 

si può quindi impegnarsi per il database in lotti dai risultati in pila. In caso di errori, la transazione verrà ritentata.