Stavo scavando nelle possibilità di Websphere MQ come sorgente di dati per lo spark-streaming perché è necessario in uno dei nostri casi d'uso. Devo sapere che il protocollo MQTT supporta la comunicazione dalle strutture di dati MQ, ma dal momento che sono un principiante a lanciare lo streaming ho bisogno di alcuni esempi di lavoro per lo stesso. Qualcuno ha provato a connettere l'MQ con lo streaming di scintille. Si prega di escogitare il modo migliore per farlo.Websphere MQ come origine dati per Apache Spark Streaming
13
A
risposta
3
Quindi, sto inviando qui il codice a lavorare per CustomMQReceiver che collega il Websphere MQ e legge i dati:
public class CustomMQReciever extends Receiver<String> { String host = null;
int port = -1;
String qm=null;
String qn=null;
String channel=null;
transient Gson gson=new Gson();
transient MQQueueConnection qCon= null;
Enumeration enumeration =null;
public CustomMQReciever(String host , int port, String qm, String channel, String qn) {
super(StorageLevel.MEMORY_ONLY_2());
this.host = host;
this.port = port;
this.qm=qm;
this.qn=qn;
this.channel=channel;
}
public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
try {
initConnection();
receive();
}
catch (JMSException ex)
{
ex.printStackTrace();
}
}
}.start();
}
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a MQ connection and receive data until receiver is stopped */
private void receive() {
System.out.print("Started receiving messages from MQ");
try {
JMSMessage receivedMessage= null;
while (!isStopped() && enumeration.hasMoreElements())
{
receivedMessage= (JMSMessage) enumeration.nextElement();
String userInput = convertStreamToString(receivedMessage);
//System.out.println("Received data :'" + userInput + "'");
store(userInput);
}
// Restart in an attempt to connect again when server is active again
//restart("Trying to connect again");
stop("No More Messages To read !");
qCon.close();
System.out.println("Queue Connection is Closed");
}
catch(Exception e)
{
e.printStackTrace();
restart("Trying to connect again");
}
catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
public void initConnection() throws JMSException
{
MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory();
conFactory.setHostName(host);
conFactory.setPort(port);
conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
conFactory.setQueueManager(qm);
conFactory.setChannel(channel);
qCon= (MQQueueConnection) conFactory.createQueueConnection();
MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1);
MQQueue queue=(MQQueue) qSession.createQueue(qn);
MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
qCon.start();
enumeration= browser.getEnumeration();
}
@Override
public StorageLevel storageLevel() {
return StorageLevel.MEMORY_ONLY_2();
}
}
1
io credo che si possa utilizzare JMS per connettersi a connettersi Websphere MQ, e Apache Camel può essere utilizzato connettersi a Websphere MQ. È possibile creare un ricevitore personalizzato in questo modo (si noti che questo modello potrebbe essere utilizzato anche senza JMS):
class JMSReceiver(topicName: String, cf: String, jndiProviderURL: String)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) with Serializable {
//Transient as this will get passed to the Workers from the Driver
@transient
var camelContextOption: Option[DefaultCamelContext] = None
def onStart() = {
camelContextOption = Some(new DefaultCamelContext())
val camelContext = camelContextOption.get
val env = new Properties()
env.setProperty("java.naming.factory.initial", "???")
env.setProperty("java.naming.provider.url", jndiProviderURL)
env.setProperty("com.webmethods.jms.clientIDSharing", "true")
val namingContext = new InitialContext(env); //using the properties file to create context
//Lookup Connection Factory
val connectionFactory = namingContext.lookup(cf).asInstanceOf[javax.jms.ConnectionFactory]
camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))
val builder = new RouteBuilder() {
def configure() = {
from(s"jms://topic:$topicName?jmsMessageType=Object&clientId=$clientId&durableSubscriptionName=${topicName}_SparkDurable&maxConcurrentConsumers=10")
.process(new Processor() {
def process(exchange: Exchange) = {
exchange.getIn.getBody match {
case s: String => store(s)
}
}
})
}
}
}
builders.foreach(camelContext.addRoutes)
camelContext.start()
}
def onStop() = if(camelContextOption.isDefined) camelContextOption.get.stop()
}
è quindi possibile creare una DSTREAM di eventi in questo modo:
val myDStream = ssc.receiverStream(new JMSReceiver("MyTopic", "MyContextFactory", "MyJNDI"))
Problemi correlati
- 1. Cluster Websphere MQ
- 2. Websphere MQ v8 - MQRC_NOT_AUTHORIZED - 2035
- 3. Come utilizzare infiniti flussi Scala come origine in Spark Streaming?
- 4. Apache Spark Streaming, Come gestire gli errori di dipendenza downstream
- 5. Test di integrazione fallito per Apache Spark Streaming
- 6. Spark Streaming - elaborazione file di dati binari
- 7. Cosa si intende per modalità binding in WebSphere MQ?
- 8. Usa websphere MQ come coordinatore di transazioni XA
- 9. Come si esplora un messaggio MQ di Websphere senza rimuoverlo?
- 10. Persistendo Spark uscita Streaming
- 11. apache spark streaming - kafka - lettura dei vecchi messaggi
- 12. Streaming di Spark Streaming Kafka
- 13. Spark Streaming stato storico
- 14. Streaming di Apache Spark vs Spring XD Stream
- 15. Spark Streaming UpdateStateByKey
- 16. Scenario di richiesta/risposta di IBM WebSphere MQ
- 17. Lambda Architecture con Apache Spark
- 18. Utilizzo di Websphere MQ con JMS da un'applicazione .NET
- 19. Streaming Spark: sicurezza dell'applicazione
- 20. Qual è la versione più recente di WebSphere MQ Client?
- 21. Collegamento ad un Websphere MQ in Java con SSL/Keystore
- 22. Apache Spark vs. Apache Storm
- 23. WebSphere MQ Low Latency Messaging - Ha un'API JMS (o JMS)?
- 24. Apache Spark vs Apache Ignite
- 25. WebSphere MQ Canale di accesso di sicurezza Domande
- 26. Qual è lo scopo del listener in WebSphere MQ?
- 27. WebSphere MQ .NET - Differenza tra IBM.XMS.dll e amqmdnet.dll
- 28. Metriche personalizzate di streaming Spark
- 29. Apache POI Streaming (SXSSF) per la lettura
- 30. Parsing json in spark-streaming
Le votazioni per chiudere come off-topic dal momento che non si adatta alle linee guida sulle domande di Stack Overflow. Suggerirei di fare queste ampie domande di architettura e fattibilità su http://mqseries.net o su uno degli altri forum MQ online. –
Penso che potrebbe essere solo un problema di fraseggio. Invece del vago _ "Stavo esaminando questa cosa. Qual è la soluzione migliore?" Potresti fare una domanda diretta. _ "Come faccio a leggere i dati da Websphere MQ tramite Apache Spark?" _ Se ne sai di più sul lato di Websphere MQ della domanda, puoi aggiungere ulteriori informazioni al riguardo. Supporta SQL? Come lo si richiede normalmente? Quali client esistono per questo? Allora qualcuno che conosce Spark può probabilmente aiutarti. –