2012-03-26 4 views
6

Nel seguente test che sto cercando di simulare il seguente scenario: si avviaCome simulare la riconsegna del messaggio nello scenario AUTO_ACKNOWLEDGE JMS Session?

  1. Una coda di messaggi.
  2. Un utente progettato per fallire durante l'elaborazione dei messaggi viene avviato.
  3. Viene generato un messaggio.
  4. L'utente inizia a elaborare il messaggio.
  5. Durante l'elaborazione viene generata un'eccezione per simulare l'errore di elaborazione dei messaggi. Il consumatore fallito viene fermato.
  6. Un altro utente viene avviato con l'intento di prelevare il messaggio riconsegnato.

Ma il mio test ha esito negativo e il messaggio non viene riconsegnato al nuovo consumatore. Apprezzerò qualsiasi suggerimento su questo.

MessageProcessingFailureAndReprocessingTest.java

@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig", 
     loader=JavaConfigContextLoader.class) 
public class MessageProcessingFailureAndReprocessingTest extends AbstractJUnit4SpringContextTests { 
    @Autowired 
    private FailureReprocessTestScenario testScenario; 

    @Before 
    public void setUp() { 
     testScenario.start(); 
    } 

    @After 
    public void tearDown() throws Exception { 
     testScenario.stop(); 
    } 

    @Test public void 
    should_reprocess_task_after_processing_failure() { 
     try { 
      Thread.sleep(20*1000); 

      assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{ 
        "task-1", 
      }))); 
     } catch (InterruptedException e) { 
      fail(); 
     } 
    } 

    @Configurable 
    public static class FailureReprocessTestScenario { 
     @Autowired 
     public BrokerService broker; 

     @Autowired 
     public MockTaskProducer mockTaskProducer; 

     @Autowired 
     public FailingWorker failingWorker; 

     @Autowired 
     public SucceedingWorker succeedingWorker; 

     @Autowired 
     public TaskScheduler scheduler; 

     public void start() { 
      Date now = new Date(); 
      scheduler.schedule(new Runnable() { 
       public void run() { failingWorker.start(); } 
      }, now); 

      Date after1Seconds = new Date(now.getTime() + 1*1000); 
      scheduler.schedule(new Runnable() { 
       public void run() { mockTaskProducer.produceTask(); } 
      }, after1Seconds); 

      Date after2Seconds = new Date(now.getTime() + 2*1000); 
      scheduler.schedule(new Runnable() { 
       public void run() { 
        failingWorker.stop(); 
        succeedingWorker.start(); 
       } 
      }, after2Seconds); 
     } 

     public void stop() throws Exception { 
      succeedingWorker.stop(); 
      broker.stop(); 
     } 
    } 

    @Configuration 
    @ImportResource(value={"classpath:applicationContext-jms.xml", 
      "classpath:applicationContext-task.xml"}) 
    public static class ContextConfig { 
     @Autowired 
     private ConnectionFactory jmsFactory; 

     @Bean 
     public FailureReprocessTestScenario testScenario() { 
      return new FailureReprocessTestScenario(); 
     } 

     @Bean 
     public MockTaskProducer mockTaskProducer() { 
      return new MockTaskProducer(); 
     } 

     @Bean 
     public FailingWorker failingWorker() { 
      TaskListener listener = new TaskListener(); 
      FailingWorker worker = new FailingWorker(listenerContainer(listener)); 
      listener.setProcessor(worker); 
      return worker; 
     } 

     @Bean 
     public SucceedingWorker succeedingWorker() { 
      TaskListener listener = new TaskListener(); 
      SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener)); 
      listener.setProcessor(worker); 
      return worker; 
     } 

     private DefaultMessageListenerContainer listenerContainer(TaskListener listener) { 
      DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer(); 
      listenerContainer.setConnectionFactory(jmsFactory); 
      listenerContainer.setDestinationName("tasksQueue"); 
      listenerContainer.setMessageListener(listener); 
      listenerContainer.setAutoStartup(false); 
      listenerContainer.initialize(); 
      return listenerContainer; 
     } 

    } 

    public static class FailingWorker implements TaskProcessor { 
     private Logger LOG = Logger.getLogger(FailingWorker.class.getName()); 

     private final DefaultMessageListenerContainer listenerContainer; 

     public FailingWorker(DefaultMessageListenerContainer listenerContainer) { 
      this.listenerContainer = listenerContainer; 
     } 

     public void start() { 
      LOG.info("FailingWorker.start()"); 
      listenerContainer.start(); 
     } 

     public void stop() { 
      LOG.info("FailingWorker.stop()"); 
      listenerContainer.stop(); 
     } 

     @Override 
     public void processTask(Object task) { 
      LOG.info("FailingWorker.processTask(" + task + ")"); 
      try { 
       Thread.sleep(1*1000); 
       throw Throwables.propagate(new Exception("Simulate task processing failure")); 
      } catch (InterruptedException e) { 
       LOG.log(Level.SEVERE, "Unexpected interruption exception"); 
      } 
     } 
    } 

    public static class SucceedingWorker implements TaskProcessor { 
     private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName()); 

     private final DefaultMessageListenerContainer listenerContainer; 

     public final List<String> processedTasks; 

     public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) { 
      this.listenerContainer = listenerContainer; 
      this.processedTasks = new ArrayList<String>(); 
     } 

     public void start() { 
      LOG.info("SucceedingWorker.start()"); 
      listenerContainer.start(); 
     } 

     public void stop() { 
      LOG.info("SucceedingWorker.stop()"); 
      listenerContainer.stop(); 
     } 

     @Override 
     public void processTask(Object task) { 
      LOG.info("SucceedingWorker.processTask(" + task + ")"); 
      try { 
       TextMessage taskText = (TextMessage) task; 
       processedTasks.add(taskText.getText()); 
      } catch (JMSException e) { 
       LOG.log(Level.SEVERE, "Unexpected exception during task processing"); 
      } 
     } 
    } 

} 

TaskListener.java

public class TaskListener implements MessageListener { 

    private TaskProcessor processor; 

    @Override 
    public void onMessage(Message message) { 
     processor.processTask(message); 
    } 

    public void setProcessor(TaskProcessor processor) { 
     this.processor = processor; 
    } 

} 

MockTaskProducer.java

@Configurable 
public class MockTaskProducer implements ApplicationContextAware { 
    private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName()); 

    @Autowired 
    private JmsTemplate jmsTemplate; 

    private Destination destination; 

    private int taskCounter = 0; 

    public void produceTask() { 
     LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")"); 

     taskCounter++; 

     jmsTemplate.send(destination, new MessageCreator() { 
      @Override 
      public Message createMessage(Session session) throws JMSException { 
       TextMessage message = session.createTextMessage("task-" + taskCounter); 
       return message; 
      } 
     }); 
    } 

    @Override 
    public void setApplicationContext(ApplicationContext applicationContext) 
      throws BeansException { 
     destination = applicationContext.getBean("tasksQueue", Destination.class); 
    } 
} 
+1

Quando ho impostato 'listenerContainer.setSessionTransacted (true)' Vedo che il messaggio viene riconsegnato, ma solo a 'FailingWorker'. Evento dopo aver arrestato il contenitore listener corrispondente, 'SucceedingWorker' non riceve mai il messaggio riconsegnato. –

+1

Appare il metodo 'listenerContainer.stop()' non chiude la connessione ai servizi, quindi il provider JMS continua a tentare di riconsegnare il messaggio non riuscito allo stesso utente. Per evitare che il consumatore in errore debba chiamare 'listenerContainer.shutdown()' ad un certo punto. –

risposta

7

Apparentemente la fonte della documentazione che stavo cercando ieri Creating Robust JMS Applications mi ha fuorviato in un modo (o avrei potuto capirlo in modo errato). Soprattutto che estratto:

Fino a quando un messaggio JMS è stata riconosciuta, non è considerato consumato con successo. Il consumo di successo di un messaggio avviene di norma in tre fasi.

  1. Il client riceve il messaggio.
  2. Il client elabora il messaggio.
  3. Il messaggio è confermato. Il riconoscimento viene avviato dal fornitore JMS o dal client, in base alla modalità di conferma della sessione .

ho assunto AUTO_ACKNOWLEDGE fa esattamente questo - ha riconosciuto il messaggio dopo il metodo ascoltatore restituisce un risultato. Ma secondo la specifica JMS è un po 'diverso e i contenitori di ascolto Spring come previsto non provano a modificare il comportamento dalle specifiche JMS.Questo è ciò che il javadoc di AbstractMessageListenerContainer ha da dire - ho sottolineato le frasi importanti:

Il contenitore ascoltatore offre il seguente messaggio di riconoscimento opzioni:

  • "sessionAcknowledgeMode" impostato su " AUTO_ACKNOWLEDGE "(predefinito): Riconoscimento automatico del messaggio prima dell'esecuzione del listener; nessuna riconsegna in caso di eccezione generata.
  • "sessionAcknowledgeMode" impostato su "CLIENT_ACKNOWLEDGE": conferma automatica del messaggio dopo l'esecuzione corretta del listener; no riconsegna in caso di eccezione lanciata.
  • "sessionAcknowledgeMode" impostato su "DUPS_OK_ACKNOWLEDGE": riconoscimento del messaggio pigro durante o dopo l'esecuzione del listener; potenziale riconsegna in caso di eccezione lanciata.
  • "sessionTransacted" impostato su "true": Riconoscimento transazionale dopo l'esecuzione del listener con esito positivo; riconsegna garantita in caso di eccezione generata.

Quindi la chiave per la mia soluzione è listenerContainer.setSessionTransacted(true);

Un altro problema che ho riscontrato è che il provider JMS mantiene riconsegna il messaggio non riuscito di nuovo allo stesso consumatore che non era riuscito durante l'elaborazione del messaggio. Non so se la specifica JMS fornisca una prescrizione su cosa dovrebbe fare il provider in tali situazioni, ma ciò che ha funzionato per me è stato utilizzare listenerContainer.shutdown(); per disconnettere il consumatore in errore e consentire al provider di riconsegnare il messaggio e dare una possibilità ad un altro consumatore.

Problemi correlati