Sto provando a creare un cluster statico di due applicazioni Spring Boot con server HornetQ incorporati. Un'applicazione/server gestirà eventi esterni e genererà messaggi da inviare a una coda di messaggi. L'altra applicazione/server sarà in ascolto sulla coda dei messaggi ed elaborerà i messaggi in arrivo. Poiché il collegamento tra le due applicazioni non è affidabile, ciascuno utilizzerà solo client locali/inVM per produrre/consumare messaggi sui rispettivi server e fare affidamento sulla funzionalità di clustering per inoltrare i messaggi alla coda sull'altro server nel cluster.Spring Boot embedded Il cluster HornetQ non inoltra i messaggi
Sto utilizzando il HornetQConfigurationCustomizer
per personalizzare il server HornetQ incorporato, perché per impostazione predefinita viene fornito solo con InVMConnectorFactory
.
Ho creato un paio di applicazioni di esempio che illustrano questa configurazione, in tutto questo esempio "ServerSend", fa riferimento al server che produrrà messaggi e "ServerReceive" si riferisce al server che utilizzerà i messaggi.
pom.xml per applicazioni contiene:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-jms-server</artifactId>
</dependency>
DemoHornetqServerSendApplication:
@SpringBootApplication
@EnableScheduling
public class DemoHornetqServerSendApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${spring.hornetq.embedded.queues}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqServerSendApplication.class, args);
}
@Scheduled(fixedRate = 5000)
private void sendMessage() {
String message = "Timestamp from Server: " + System.currentTimeMillis();
System.out.println("Sending message: " + message);
jmsTemplate.convertAndSend(testQueue, message);
}
@Bean
public HornetQConfigurationCustomizer hornetCustomizer() {
return new HornetQConfigurationCustomizer() {
@Override
public void customize(Configuration configuration) {
String serverSendConnectorName = "server-send-connector";
String serverReceiveConnectorName = "server-receive-connector";
Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5445");
TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverSendConnectorName, tc);
Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
acceptors.add(tc);
params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5446");
tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverReceiveConnectorName, tc);
List<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(serverReceiveConnectorName);
ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
"my-cluster", // name
"jms", // address
serverSendConnectorName, // connector name
500, // retry interval
true, // duplicate detection
true, // forward when no consumers
1, // max hops
1000000, // confirmation window size
staticConnectors,
true // allow direct connections only
);
configuration.getClusterConfigurations().add(conf);
AddressSettings setting = new AddressSettings();
setting.setRedistributionDelay(0);
configuration.getAddressesSettings().put("#", setting);
}
};
}
}
application.properties (ServerSend):
spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password
DemoHornetqServerReceiveApplication:
@SpringBootApplication
@EnableJms
public class DemoHornetqServerReceiveApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${spring.hornetq.embedded.queues}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqServerReceiveApplication.class, args);
}
@JmsListener(destination="${spring.hornetq.embedded.queues}")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
@Bean
public HornetQConfigurationCustomizer hornetCustomizer() {
return new HornetQConfigurationCustomizer() {
@Override
public void customize(Configuration configuration) {
String serverSendConnectorName = "server-send-connector";
String serverReceiveConnectorName = "server-receive-connector";
Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5446");
TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverReceiveConnectorName, tc);
Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
acceptors.add(tc);
params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, "localhost");
params.put(TransportConstants.PORT_PROP_NAME, "5445");
tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
connectorConf.put(serverSendConnectorName, tc);
List<String> staticConnectors = new ArrayList<String>();
staticConnectors.add(serverSendConnectorName);
ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
"my-cluster", // name
"jms", // address
serverReceiveConnectorName, // connector name
500, // retry interval
true, // duplicate detection
true, // forward when no consumers
1, // max hops
1000000, // confirmation window size
staticConnectors,
true // allow direct connections only
);
configuration.getClusterConfigurations().add(conf);
AddressSettings setting = new AddressSettings();
setting.setRedistributionDelay(0);
configuration.getAddressesSettings().put("#", setting);
}
};
}
}
application.properties (ServerReceive):
spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password
Dopo l'avvio entrambe le applicazioni, accedere output mostra questo:
ServerSend:
2015-04-09 11:11: 58.471 INFO 7536 --- [main] org.hornetq.core.server: HQ221000: il server live inizia con la configurazione HornetQ Configuration (clustered = true, backup = false, sharedStore = true, journalDirectory = C: \ Users **** \ AppData \ Loca l \ Temp \ hornetq-data/journal, bindingsDirectory = data/bindings, largeMessagesDirectory = data/largemessages, pagingDirectory = data/paging)
2015-04-09 11: 11: 58.501 INFO 7536 --- [main] org. hornetq.core.server: HQ221045: libaio non è disponibile, passaggio della configurazione in NIO
2015-04-09 11: 11: 58.595 INFO 7536 --- [main] org.hornetq.core.server: HQ221043: Aggiunta del protocollo supporto CORE
2015-04-09 11: 11: 58.720 INFO 7536 --- [main] org.hornetq.core.server: HQ221003: tentativo di distribuzione della coda jms.queue.jms.testqueue
2015-04-09 11: 11: 59.568 INFO 7536 --- [main] org.hornetq.core.server: HQ221020: Avviata Netty Acceptor versione 4.0.13.Indirizzo locale finale: 5445
2015-04-09 11: 1 1: 59.593 INFO 7536 --- [main] org.hornetq.core.server: HQ221007: il server è ora attivo
2015-04-09 11: 11: 59.593 INFO 7536 --- [main] org.hornetq.core .server: HQ221001: HornetQ Server versione 2.4.5.FINALE (Wild Hornet, 124) [c139929d-d90f-11e4-ba2e-e58abf5d6944]
ServerReceive:
2015-04-09 11: 12: 04,401 INFO 4528 --- [principale] org.hornetq.core.server: HQ221000: il server live inizia con la configurazione HornetQ Configuration (clustered = true, backup = false, sharedStore = true, journalDirectory = C: \ Users **** \ AppData \ Local \ Temp \ hornetq- dati/journal, bindingsDirectory = dati/attacchi, largeMessagesDirectory = dati/largemessages, pagingDirectory = dati/paging)
2015/04/09 11: 12: 04,410 INFO 4528 --- [principale] org.hornetq.core.server: HQ221045: libaio non è disponibile, cambiando il file configurazione in NIO
2015-04-09 11: 12: 04.520 INFO 4528 --- [main] org.hornetq.core.server: HQ221043: Aggiunta del supporto protocollo CORE
2015-04-09 11: 12: 04,629 INFO 4528 --- [principale] org.hornetq.core.server: HQ221003: cercando di distribuire coda jms.queue.jms.testqueue
2015/04/09 11: 12: 05,545 INFO 4528 --- [principale] org. hornetq.core.server: HQ221020: Iniziato versione Netty Acceptor 4.0.13.Final localhost: 5446
2015/04/09 11: 12: 05,578 INFO 4528 --- [principale] org.hornetq.core.server: HQ221007: Server è ora vivono
2015/04/09 11: 12: 05,578 INFO 4528 --- [principale] org.hornetq.core.server: HQ221001: versione HornetQ Server 2.4.5.FINAL (Wild Hornet, 124) [c139929d -d90f-11e4-ba2e -e58abf5d6944]
vedo clustered=true
in entrambe le uscite, e questo sarebbe mostrare false
se ho rimosso la configurazione cluster dal HornetQConfigurationCustomizer
, quindi deve avere qualche effetto.
Ora, ServerSend mostra questo l'uscita della console:
messaggio Invio: Timestamp dal server: 1.428.574,32491 milioni
messaggio Invio: Timestamp dal server: 1.428.574,329899 millions
messaggio Invio: Timestamp dal server: 1.428.574,334904 millions
Tuttavia, ServerReceive non mostra nulla.
Sembra che i messaggi non vengano inoltrati da ServerSend a ServerReceive.
ho fatto un po 'di test, con la creazione di due ulteriori applicazioni Spring Boot (ClientSend e ClientReceive), che fare non hanno un server HornetQ incorporato e invece connettersi a un server "nativo".
pom.xml per entrambe le applicazioni client contiene:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
DemoHornetqClientSendApplication:
@SpringBootApplication
@EnableScheduling
public class DemoHornetqClientSendApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${queue}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqClientSendApplication.class, args);
}
@Scheduled(fixedRate = 5000)
private void sendMessage() {
String message = "Timestamp from Client: " + System.currentTimeMillis();
System.out.println("Sending message: " + message);
jmsTemplate.convertAndSend(testQueue, message);
}
}
application.properties (ClientSend):
spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5446
queue=jms.testqueue
DemoHornetqClientReceiveApplication:
@SpringBootApplication
@EnableJms
public class DemoHornetqClientReceiveApplication {
@Autowired
private JmsTemplate jmsTemplate;
private @Value("${queue}") String testQueue;
public static void main(String[] args) {
SpringApplication.run(DemoHornetqClientReceiveApplication.class, args);
}
@JmsListener(destination="${queue}")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
applicazione.proprietà (ClientReceive):
spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5445
queue=jms.testqueue
Ora la console mostra questo:
ServerReveive:
messaggio Received: Timestamp dal Cliente: 1.428.574,96663 milioni
messaggio Received: Timestamp dal Cliente: 1.428.574,9716 milioni
Messaggio ricevuto: Data/ora dal cliente: 1428574976595
ClientReceive:
messaggio Received: Timestamp dal server: 1.428.574,969436 millions
messaggio Received: Timestamp dal server: 1.428.574,974438 millions
messaggio Received: Timestamp dal server: 1428574979446
Se ho ServerSend corsa per un po ', e poi avvia ClientReceive, riceve anche tutti i messaggi in coda fino a quel punto, quindi questo mostra che i messaggi non spariscono da qualche parte, o vengono consumati da qualche altra parte.
Per completezza ho anche indicato ClientSend a ServerSend e ClientReceive a ServerReceive, per vedere se c'è qualche problema con il clustering e i client InVM, ma ancora non c'era alcun output che indica che è stato ricevuto alcun messaggio in ClientReceive o in ClientReceive. ServerReceive.
Quindi sembra che il recapito dei messaggi da/per ciascun broker incorporato a client esterni direttamente connessi funzioni correttamente, ma nessun messaggio viene inoltrato tra i broker nel cluster.
Quindi, dopo tutto questo, la grande domanda, cosa c'è di sbagliato con l'installazione che i messaggi non vengono inoltrati all'interno del cluster?
Hai trovato la risposta a questa domanda nel mese scorso? Non ho trovato nulla su Internet (eccetto questo post) sulla comunicazione di avvio a molla hornetq tra due processi ... –
@ deepdownunder2222 non ho avuto fortuna finora, ho avuto qualche successo con il clustering di due broker esterni in questo modo, ma anche quello non ha funzionato al 100% come mi aspettavo, quindi l'ho parcheggiato per il momento. Sono riuscito a fare ciò che mi serviva con ActiveMq, ma non ha ancora superato la fase di prototipo. Quello che ho descritto nella domanda non sembra essere una cosa popolare da fare, non c'è ancora stato molto interesse. –
Hai invece cercato di creare un bridge, l'architettura sembra allineare meglio a ciò che stai cercando di ottenere: http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/ html/core-bridges.html – grahamrb