2013-10-08 7 views
5

Desidero elaborare i messaggi da una coda rabbitMq in parallelo. La coda è configurata per essere autoAck = false. Sto usando il supporto camel-rabbitMQ per camel endpoints, che supporta un parametro threadPoolSize, ma questo non ha l'effetto desiderato. I messaggi vengono ancora elaborati in serie dalla coda, anche quando threadpoolsize = 20.Consumo parallelo client java Rabbit Mq

Dal debug tramite il codice, è possibile vedere che il parametro threadpoolsize viene utilizzato per creare un ExecutorService che viene utilizzato per passare alla connectionfactory di coniglio come descritto here. Tutto sembra a posto finchè non entri nel coniglio ConsumerWorkService. Qui i messaggi vengono elaborati in blocchi di messaggi di dimensione massima 16. Ogni messaggio in un blocco viene elaborato in serie e quindi se c'è più lavoro da fare, il servizio executor viene richiamato con il blocco successivo. Un frammento di codice di questo è sotto. Da questo uso del servizio executor non riesco a vedere come i messaggi possano essere elaborati in parallelo. Il servizio executors ha sempre un solo pezzo da eseguire alla volta.

Cosa mi manca? documentazione

private final class WorkPoolRunnable implements Runnable { 

     public void run() { 
      int size = MAX_RUNNABLE_BLOCK_SIZE; 
      List<Runnable> block = new ArrayList<Runnable>(size); 
      try { 
       Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size); 
       if (key == null) return; // nothing ready to run 
       try { 
        for (Runnable runnable : block) { 
         runnable.run(); 
        } 
       } finally { 
        if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) { 
         ConsumerWorkService.this.executor.execute(new WorkPoolRunnable()); 
        } 
       } 
      } catch (RuntimeException e) { 
       Thread.currentThread().interrupt(); 
      } 
     } 
+0

È possibile configurare ConsumerWorkService per utilizzare una dimensione di blocco diversa? –

+1

Ciao Claus, ho apportato alcune modifiche al componente Camel-rabbitmq tramite Github come Fergus Nelson. Ho apportato delle modifiche a RabbitMqConsumer per configurare un canale per ciascun consumatore interessato richiesto. Creerò una richiesta di pull Jira + quando avrò testato tutto. –

+0

@ mR_fr0g, come ho capito, hai risolto il problema creando più canali nel componente Camel-RabbitMQ. Potresti fornire il link al tuo ticket Jira, tirare la richiesta e specificare in quale versione di Camel è presente la correzione? – wheleph

risposta

3

di RabbitMQ non è molto chiaro su questo, ma, anche se il ConsumerWorkService sta usando un pool di thread, questo pool non sembra per essere utilizzato in un modo per elaborare i messaggi in parallelo:

Ogni canale ha il proprio thread di invio. Per il caso d'uso più comune di un Consumatore per Canale, ciò significa che i Consumatori non intrattengono altri Consumatori. Se disponi di più consumatori per canale, tieni presente che un consumatore di lunga data potrebbe ritardare l'invio di callback ad altri consumatori su quel canale.

(http://www.rabbitmq.com/api-guide.html)

Questa documentazione suggerisce di utilizzare uno Channel per thread e, in effetti, se è sufficiente creare il maggior numero di Channel s come il livello di concorrenza, i messaggi verranno inviati tra i consumatori legati alla questi canali.

Ho provato con 2 canali e utenti: quando 2 messaggi sono in coda, ogni utente sceglie solo un messaggio alla volta. I blocchi di 16 messaggi che hai citato non sembrano interferire, ed è una buona cosa.

In effetti, Spring AMQP crea anche diversi canali per elaborare i messaggi contemporaneamente. Questo viene fatto:

Ho anche provato questo per funzionare come previsto.

3

Se si dispone di una singola istanza Channel, invierà i suoi utenti registrati in serie come è stato rilevato correttamente esaminando ConsumerWorkService. Esistono due modi per superarlo:

  1. Utilizzare più canali anziché uno.
  2. Utilizzare il canale singolo ma attuare gli utenti in un modo speciale. Dovrebbero semplicemente selezionare il messaggio in arrivo dalla coda e inserirlo come attività in un pool di thread interno.

È possibile trovare ulteriori dettagli in this post.