2015-04-09 13 views
25

Sto provando a creare un cluster statico di due applicazioni Spring Boot con server HornetQ incorporati. Un'applicazione/server gestirà eventi esterni e genererà messaggi da inviare a una coda di messaggi. L'altra applicazione/server sarà in ascolto sulla coda dei messaggi ed elaborerà i messaggi in arrivo. Poiché il collegamento tra le due applicazioni non è affidabile, ciascuno utilizzerà solo client locali/inVM per produrre/consumare messaggi sui rispettivi server e fare affidamento sulla funzionalità di clustering per inoltrare i messaggi alla coda sull'altro server nel cluster.Spring Boot embedded Il cluster HornetQ non inoltra i messaggi

Sto utilizzando il HornetQConfigurationCustomizer per personalizzare il server HornetQ incorporato, perché per impostazione predefinita viene fornito solo con InVMConnectorFactory.

Ho creato un paio di applicazioni di esempio che illustrano questa configurazione, in tutto questo esempio "ServerSend", fa riferimento al server che produrrà messaggi e "ServerReceive" si riferisce al server che utilizzerà i messaggi.

pom.xml per applicazioni contiene:

<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-hornetq</artifactId> 
</dependency> 
<dependency> 
    <groupId>org.hornetq</groupId> 
    <artifactId>hornetq-jms-server</artifactId> 
</dependency> 

DemoHornetqServerSendApplication:

@SpringBootApplication 
@EnableScheduling 
public class DemoHornetqServerSendApplication { 
    @Autowired 
    private JmsTemplate jmsTemplate; 
    private @Value("${spring.hornetq.embedded.queues}") String testQueue; 

    public static void main(String[] args) { 
     SpringApplication.run(DemoHornetqServerSendApplication.class, args); 
    } 

    @Scheduled(fixedRate = 5000) 
    private void sendMessage() { 
     String message = "Timestamp from Server: " + System.currentTimeMillis(); 
     System.out.println("Sending message: " + message); 
     jmsTemplate.convertAndSend(testQueue, message); 
    } 

    @Bean 
    public HornetQConfigurationCustomizer hornetCustomizer() { 
     return new HornetQConfigurationCustomizer() { 

      @Override 
      public void customize(Configuration configuration) { 
       String serverSendConnectorName = "server-send-connector"; 
       String serverReceiveConnectorName = "server-receive-connector"; 

       Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations(); 

       Map<String, Object> params = new HashMap<String, Object>(); 
       params.put(TransportConstants.HOST_PROP_NAME, "localhost"); 
       params.put(TransportConstants.PORT_PROP_NAME, "5445"); 
       TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); 
       connectorConf.put(serverSendConnectorName, tc); 

       Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations(); 
       tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); 
       acceptors.add(tc); 

       params = new HashMap<String, Object>(); 
       params.put(TransportConstants.HOST_PROP_NAME, "localhost"); 
       params.put(TransportConstants.PORT_PROP_NAME, "5446"); 
       tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); 
       connectorConf.put(serverReceiveConnectorName, tc); 

       List<String> staticConnectors = new ArrayList<String>(); 
       staticConnectors.add(serverReceiveConnectorName); 
       ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
         "my-cluster", // name 
         "jms", // address 
         serverSendConnectorName, // connector name 
         500, // retry interval 
         true, // duplicate detection 
         true, // forward when no consumers 
         1, // max hops 
         1000000, // confirmation window size 
         staticConnectors, 
         true // allow direct connections only 
         ); 
       configuration.getClusterConfigurations().add(conf); 

       AddressSettings setting = new AddressSettings(); 
       setting.setRedistributionDelay(0); 
       configuration.getAddressesSettings().put("#", setting); 
      } 
     }; 
    } 
} 

application.properties (ServerSend):

spring.hornetq.mode=embedded 
spring.hornetq.embedded.enabled=true 
spring.hornetq.embedded.queues=jms.testqueue 
spring.hornetq.embedded.cluster-password=password 

DemoHornetqServerReceiveApplication:

@SpringBootApplication 
@EnableJms 
public class DemoHornetqServerReceiveApplication { 
    @Autowired 
    private JmsTemplate jmsTemplate; 
    private @Value("${spring.hornetq.embedded.queues}") String testQueue; 

    public static void main(String[] args) { 
     SpringApplication.run(DemoHornetqServerReceiveApplication.class, args); 
    } 

    @JmsListener(destination="${spring.hornetq.embedded.queues}") 
    public void receiveMessage(String message) { 
     System.out.println("Received message: " + message); 
    } 

    @Bean 
    public HornetQConfigurationCustomizer hornetCustomizer() { 
     return new HornetQConfigurationCustomizer() { 

      @Override 
      public void customize(Configuration configuration) { 
       String serverSendConnectorName = "server-send-connector"; 
       String serverReceiveConnectorName = "server-receive-connector"; 

       Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations(); 

       Map<String, Object> params = new HashMap<String, Object>(); 
       params.put(TransportConstants.HOST_PROP_NAME, "localhost"); 
       params.put(TransportConstants.PORT_PROP_NAME, "5446"); 
       TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); 
       connectorConf.put(serverReceiveConnectorName, tc); 

       Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations(); 
       tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); 
       acceptors.add(tc); 

       params = new HashMap<String, Object>(); 
       params.put(TransportConstants.HOST_PROP_NAME, "localhost"); 
       params.put(TransportConstants.PORT_PROP_NAME, "5445"); 
       tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); 
       connectorConf.put(serverSendConnectorName, tc); 

       List<String> staticConnectors = new ArrayList<String>(); 
       staticConnectors.add(serverSendConnectorName); 
       ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
         "my-cluster", // name 
         "jms", // address 
         serverReceiveConnectorName, // connector name 
         500, // retry interval 
         true, // duplicate detection 
         true, // forward when no consumers 
         1, // max hops 
         1000000, // confirmation window size 
         staticConnectors, 
         true // allow direct connections only 
         ); 
       configuration.getClusterConfigurations().add(conf); 

       AddressSettings setting = new AddressSettings(); 
       setting.setRedistributionDelay(0); 
       configuration.getAddressesSettings().put("#", setting); 
      } 
     }; 
    } 
} 

application.properties (ServerReceive):

spring.hornetq.mode=embedded 
spring.hornetq.embedded.enabled=true 
spring.hornetq.embedded.queues=jms.testqueue 
spring.hornetq.embedded.cluster-password=password 

Dopo l'avvio entrambe le applicazioni, accedere output mostra questo:

ServerSend:

2015-04-09 11:11: 58.471 INFO 7536 --- [main] org.hornetq.core.server: HQ221000: il server live inizia con la configurazione HornetQ Configuration (clustered = true, backup = false, sharedStore = true, journalDirectory = C: \ Users **** \ AppData \ Loca l \ Temp \ hornetq-data/journal, bindingsDirectory = data/bindings, largeMessagesDirectory = data/largemessages, pagingDirectory = data/paging)
2015-04-09 11: 11: 58.501 INFO 7536 --- [main] org. hornetq.core.server: HQ221045: libaio non è disponibile, passaggio della configurazione in NIO
2015-04-09 11: 11: 58.595 INFO 7536 --- [main] org.hornetq.core.server: HQ221043: Aggiunta del protocollo supporto CORE
2015-04-09 11: 11: 58.720 INFO 7536 --- [main] org.hornetq.core.server: HQ221003: tentativo di distribuzione della coda jms.queue.jms.testqueue
2015-04-09 11: 11: 59.568 INFO 7536 --- [main] org.hornetq.core.server: HQ221020: Avviata Netty Acceptor versione 4.0.13.Indirizzo locale finale: 5445
2015-04-09 11: 1 1: 59.593 INFO 7536 --- [main] org.hornetq.core.server: HQ221007: il server è ora attivo
2015-04-09 11: 11: 59.593 INFO 7536 --- [main] org.hornetq.core .server: HQ221001: HornetQ Server versione 2.4.5.FINALE (Wild Hornet, 124) [c139929d-d90f-11e4-ba2e-e58abf5d6944]

ServerReceive:

2015-04-09 11: 12: 04,401 INFO 4528 --- [principale] org.hornetq.core.server: HQ221000: il server live inizia con la configurazione HornetQ Configuration (clustered = true, backup = false, sharedStore = true, journalDirectory = C: \ Users **** \ AppData \ Local \ Temp \ hornetq- dati/journal, bindingsDirectory = dati/attacchi, largeMessagesDirectory = dati/largemessages, pagingDirectory = dati/paging)
2015/04/09 11: 12: 04,410 INFO 4528 --- [principale] org.hornetq.core.server: HQ221045: libaio non è disponibile, cambiando il file configurazione in NIO
2015-04-09 11: 12: 04.520 INFO 4528 --- [main] org.hornetq.core.server: HQ221043: Aggiunta del supporto protocollo CORE
2015-04-09 11: 12: 04,629 INFO 4528 --- [principale] org.hornetq.core.server: HQ221003: cercando di distribuire coda jms.queue.jms.testqueue
2015/04/09 11: 12: 05,545 INFO 4528 --- [principale] org. hornetq.core.server: HQ221020: Iniziato versione Netty Acceptor 4.0.13.Final localhost: 5446
2015/04/09 11: 12: 05,578 INFO 4528 --- [principale] org.hornetq.core.server: HQ221007: Server è ora vivono
2015/04/09 11: 12: 05,578 INFO 4528 --- [principale] org.hornetq.core.server: HQ221001: versione HornetQ Server 2.4.5.FINAL (Wild Hornet, 124) [c139929d -d90f-11e4-ba2e -e58abf5d6944]

vedo clustered=true in entrambe le uscite, e questo sarebbe mostrare false se ho rimosso la configurazione cluster dal HornetQConfigurationCustomizer, quindi deve avere qualche effetto.

Ora, ServerSend mostra questo l'uscita della console:

messaggio Invio: Timestamp dal server: 1.428.574,32491 milioni
messaggio Invio: Timestamp dal server: 1.428.574,329899 millions
messaggio Invio: Timestamp dal server: 1.428.574,334904 millions

Tuttavia, ServerReceive non mostra nulla.

Sembra che i messaggi non vengano inoltrati da ServerSend a ServerReceive.

ho fatto un po 'di test, con la creazione di due ulteriori applicazioni Spring Boot (ClientSend e ClientReceive), che fare non hanno un server HornetQ incorporato e invece connettersi a un server "nativo".

pom.xml per entrambe le applicazioni client contiene:

<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-hornetq</artifactId> 
</dependency> 

DemoHornetqClientSendApplication:

@SpringBootApplication 
@EnableScheduling 
public class DemoHornetqClientSendApplication { 
    @Autowired 
    private JmsTemplate jmsTemplate; 
    private @Value("${queue}") String testQueue; 

    public static void main(String[] args) { 
     SpringApplication.run(DemoHornetqClientSendApplication.class, args); 
    } 

    @Scheduled(fixedRate = 5000) 
    private void sendMessage() { 
     String message = "Timestamp from Client: " + System.currentTimeMillis(); 
     System.out.println("Sending message: " + message); 
     jmsTemplate.convertAndSend(testQueue, message); 
    } 
} 

application.properties (ClientSend):

spring.hornetq.mode=native 
spring.hornetq.host=localhost 
spring.hornetq.port=5446 

queue=jms.testqueue 

DemoHornetqClientReceiveApplication:

@SpringBootApplication 
@EnableJms 
public class DemoHornetqClientReceiveApplication { 
    @Autowired 
    private JmsTemplate jmsTemplate; 
    private @Value("${queue}") String testQueue; 

    public static void main(String[] args) { 
     SpringApplication.run(DemoHornetqClientReceiveApplication.class, args); 
    } 

    @JmsListener(destination="${queue}") 
    public void receiveMessage(String message) { 
     System.out.println("Received message: " + message); 
    } 
} 

applicazione.proprietà (ClientReceive):

spring.hornetq.mode=native 
spring.hornetq.host=localhost 
spring.hornetq.port=5445 

queue=jms.testqueue 

Ora la console mostra questo:

ServerReveive:

messaggio Received: Timestamp dal Cliente: 1.428.574,96663 milioni
messaggio Received: Timestamp dal Cliente: 1.428.574,9716 milioni
Messaggio ricevuto: Data/ora dal cliente: 1428574976595

ClientReceive:

messaggio Received: Timestamp dal server: 1.428.574,969436 millions
messaggio Received: Timestamp dal server: 1.428.574,974438 millions
messaggio Received: Timestamp dal server: 1428574979446

Se ho ServerSend corsa per un po ', e poi avvia ClientReceive, riceve anche tutti i messaggi in coda fino a quel punto, quindi questo mostra che i messaggi non spariscono da qualche parte, o vengono consumati da qualche altra parte.

Per completezza ho anche indicato ClientSend a ServerSend e ClientReceive a ServerReceive, per vedere se c'è qualche problema con il clustering e i client InVM, ma ancora non c'era alcun output che indica che è stato ricevuto alcun messaggio in ClientReceive o in ClientReceive. ServerReceive.

Quindi sembra che il recapito dei messaggi da/per ciascun broker incorporato a client esterni direttamente connessi funzioni correttamente, ma nessun messaggio viene inoltrato tra i broker nel cluster.

Quindi, dopo tutto questo, la grande domanda, cosa c'è di sbagliato con l'installazione che i messaggi non vengono inoltrati all'interno del cluster?

+0

Hai trovato la risposta a questa domanda nel mese scorso? Non ho trovato nulla su Internet (eccetto questo post) sulla comunicazione di avvio a molla hornetq tra due processi ... –

+0

@ deepdownunder2222 non ho avuto fortuna finora, ho avuto qualche successo con il clustering di due broker esterni in questo modo, ma anche quello non ha funzionato al 100% come mi aspettavo, quindi l'ho parcheggiato per il momento. Sono riuscito a fare ciò che mi serviva con ActiveMq, ma non ha ancora superato la fase di prototipo. Quello che ho descritto nella domanda non sembra essere una cosa popolare da fare, non c'è ancora stato molto interesse. –

+1

Hai invece cercato di creare un bridge, l'architettura sembra allineare meglio a ciò che stai cercando di ottenere: http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/ html/core-bridges.html – grahamrb

risposta

0

http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/architecture.html#d0e595

"core HornetQ è concepito come un insieme di semplici POJO quindi se si dispone di un'applicazione che richiede funzionalità di messaggistica internamente, ma non si vuole esporre che come server HornetQ è possibile creare un'istanza direttamente ed embed I server HornetQ nella tua applicazione. "

Se lo si incorpora, non lo si espone come server. Ciascuno dei tuoi contenitori ha un'istanza separata. È l'equivalente dell'avvio di 2 copie di calabrone e dando loro lo stesso nome di coda. Uno scrive su quella coda nella prima istanza e l'altro ascolta la coda nella seconda istanza.

Se si desidera disaccoppiare le app in questo modo, è necessario disporre di un singolo punto che funge da server. Probabilmente, vuoi raggruppare. Questo non è specifico per Hornet, BTW. Troverete questo modello spesso.

+0

Non sono convinto da questa risposta. 1. I documenti citati non specificano specificamente che ciò che sto cercando di fare non può essere fatto. 2. Sto già esponendo il server embedded per l'accesso esterno, ovvero collegandolo ad un client esterno, senza problemi con questo. 3. Sì, il clustering è esattamente quello che sto cercando, ed è esattamente di questa domanda, ed è anche esattamente ciò che è _non_ working –

+0

ci_ in realtà il clustering NON è quello che stai cercando. I cluster vengono utilizzati per consentire la ridondanza e per distribuire e condividere il carico. Stai tentando di utilizzare un server per inviare direttamente a un altro server. Può essere fatto? sì. È lo scopo del clustering? No. – pczeus