Desidero elaborare i messaggi da una coda rabbitMq in parallelo. La coda è configurata per essere autoAck = false. Sto usando il supporto camel-rabbitMQ per camel endpoints, che supporta un parametro threadPoolSize, ma questo non ha l'effetto desiderato. I messaggi vengono ancora elaborati in serie dalla coda, anche quando threadpoolsize = 20.Consumo parallelo client java Rabbit Mq
Dal debug tramite il codice, è possibile vedere che il parametro threadpoolsize viene utilizzato per creare un ExecutorService che viene utilizzato per passare alla connectionfactory di coniglio come descritto here. Tutto sembra a posto finchè non entri nel coniglio ConsumerWorkService
. Qui i messaggi vengono elaborati in blocchi di messaggi di dimensione massima 16. Ogni messaggio in un blocco viene elaborato in serie e quindi se c'è più lavoro da fare, il servizio executor viene richiamato con il blocco successivo. Un frammento di codice di questo è sotto. Da questo uso del servizio executor non riesco a vedere come i messaggi possano essere elaborati in parallelo. Il servizio executors ha sempre un solo pezzo da eseguire alla volta.
Cosa mi manca? documentazione
private final class WorkPoolRunnable implements Runnable {
public void run() {
int size = MAX_RUNNABLE_BLOCK_SIZE;
List<Runnable> block = new ArrayList<Runnable>(size);
try {
Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
if (key == null) return; // nothing ready to run
try {
for (Runnable runnable : block) {
runnable.run();
}
} finally {
if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
}
}
} catch (RuntimeException e) {
Thread.currentThread().interrupt();
}
}
È possibile configurare ConsumerWorkService per utilizzare una dimensione di blocco diversa? –
Ciao Claus, ho apportato alcune modifiche al componente Camel-rabbitmq tramite Github come Fergus Nelson. Ho apportato delle modifiche a RabbitMqConsumer per configurare un canale per ciascun consumatore interessato richiesto. Creerò una richiesta di pull Jira + quando avrò testato tutto. –
@ mR_fr0g, come ho capito, hai risolto il problema creando più canali nel componente Camel-RabbitMQ. Potresti fornire il link al tuo ticket Jira, tirare la richiesta e specificare in quale versione di Camel è presente la correzione? – wheleph