我正在深入研究Websphere MQ作爲Spark流的數據源的可能性,因爲它是我們的一個用例所需要的。 我知道MQTT是支持來自MQ數據結構的通信的協議,但由於我是一個引發流媒體的新手,我需要一些相同的工作示例。 有沒有人嘗試用火花流連接MQ。請設計出這樣做的最佳方式。Websphere MQ作爲Apache Spark流的數據源
13
A
回答
3
所以,我在這裏張貼了CustomMQReceiver其連接WebSphere MQ和讀取工作代碼數據:
public class CustomMQReciever extends Receiver<String> { String host = null;
int port = -1;
String qm=null;
String qn=null;
String channel=null;
transient Gson gson=new Gson();
transient MQQueueConnection qCon= null;
Enumeration enumeration =null;
public CustomMQReciever(String host , int port, String qm, String channel, String qn) {
super(StorageLevel.MEMORY_ONLY_2());
this.host = host;
this.port = port;
this.qm=qm;
this.qn=qn;
this.channel=channel;
}
public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
try {
initConnection();
receive();
}
catch (JMSException ex)
{
ex.printStackTrace();
}
}
}.start();
}
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a MQ connection and receive data until receiver is stopped */
private void receive() {
System.out.print("Started receiving messages from MQ");
try {
JMSMessage receivedMessage= null;
while (!isStopped() && enumeration.hasMoreElements())
{
receivedMessage= (JMSMessage) enumeration.nextElement();
String userInput = convertStreamToString(receivedMessage);
//System.out.println("Received data :'" + userInput + "'");
store(userInput);
}
// Restart in an attempt to connect again when server is active again
//restart("Trying to connect again");
stop("No More Messages To read !");
qCon.close();
System.out.println("Queue Connection is Closed");
}
catch(Exception e)
{
e.printStackTrace();
restart("Trying to connect again");
}
catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
public void initConnection() throws JMSException
{
MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory();
conFactory.setHostName(host);
conFactory.setPort(port);
conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
conFactory.setQueueManager(qm);
conFactory.setChannel(channel);
qCon= (MQQueueConnection) conFactory.createQueueConnection();
MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1);
MQQueue queue=(MQQueue) qSession.createQueue(qn);
MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
qCon.start();
enumeration= browser.getEnumeration();
}
@Override
public StorageLevel storageLevel() {
return StorageLevel.MEMORY_ONLY_2();
}
}
1
我相信你可以使用JMS連接來連接的WebSphere MQ和Apache駱駝可以使用連接到Websphere MQ。您可以創建自定義接收器像這樣(請注意,這種模式也沒有JMS使用):
class JMSReceiver(topicName: String, cf: String, jndiProviderURL: String)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) with Serializable {
//Transient as this will get passed to the Workers from the Driver
@transient
var camelContextOption: Option[DefaultCamelContext] = None
def onStart() = {
camelContextOption = Some(new DefaultCamelContext())
val camelContext = camelContextOption.get
val env = new Properties()
env.setProperty("java.naming.factory.initial", "???")
env.setProperty("java.naming.provider.url", jndiProviderURL)
env.setProperty("com.webmethods.jms.clientIDSharing", "true")
val namingContext = new InitialContext(env); //using the properties file to create context
//Lookup Connection Factory
val connectionFactory = namingContext.lookup(cf).asInstanceOf[javax.jms.ConnectionFactory]
camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))
val builder = new RouteBuilder() {
def configure() = {
from(s"jms://topic:$topicName?jmsMessageType=Object&clientId=$clientId&durableSubscriptionName=${topicName}_SparkDurable&maxConcurrentConsumers=10")
.process(new Processor() {
def process(exchange: Exchange) = {
exchange.getIn.getBody match {
case s: String => store(s)
}
}
})
}
}
}
builders.foreach(camelContext.addRoutes)
camelContext.start()
}
def onStop() = if(camelContextOption.isDefined) camelContextOption.get.stop()
}
然後,您可以創建活動的DSTREAM像這樣:
val myDStream = ssc.receiverStream(new JMSReceiver("MyTopic", "MyContextFactory", "MyJNDI"))
相關問題
- 1. IBM Websphere MQ轉換爲Apache Apex操作員流?
- 2. Apache Camel - Websphere MQ集成
- 3. Apache Spark RDD工作流程
- 4. 啓用爲WebSphere MQ
- 5. 在Apache Spark中加入流式數據
- 6. MQ WebSphere MessageID Woes
- 7. 將Apache ServiceMix連接到Websphere MQ
- 8. IBM Websphere MQ Monitoring
- 9. WebSphere MQ的QueueConnectionFactory
- 10. WebSphere MQ中的MQRC資源問題
- 11. WebSphere MQ性能
- 12. Apache Spark流和Spring XD流
- 13. 加入流Apache Spark
- 14. Apache Spark(scala)+ python/R數據分析工作流程
- 15. WebSphere MQ資源管理和AMQ8077錯誤
- 16. AMQP vs Websphere MQ
- 17. Websphere MQ集羣
- 18. 登錄WebSphere MQ
- 19. WebSphere MQ .NET API
- 20. Mdb Glasswish Websphere MQ
- 21. Spring-JMS(Websphere MQ)
- 22. Spark Apache源代碼
- 23. 集羣的WebSphere MQ
- 24. 的WebSphere MQ消息
- 25. Apache Spark應用程序工作流程
- 26. WebSphere中的數據源
- 27. Spring JMS和Websphere MQ
- 28. Websphere MQ系列 - MQGMO_WAIT
- 29. Websphere MQ - Java連接
- 30. C#IBM MQ WEBSPHERE MQRC_NOT_AUTHORIZED
投票關閉爲因爲它不適合Stack Overflow的問題指南。我建議在http://mqseries.net或其他在線MQ論壇上提出這些廣泛的體系結構和可行性問題。 –
我認爲這可能只是一個措辭問題。而不是模糊的「我正在研究這件事情,最好的解決方案是什麼?」你可以直接提出問題。 _「如何通過Apache Spark從Websphere MQ讀取數據?」_如果您對該問題的Websphere MQ更加了解,則可以添加更多關於該問題的信息。它支持SQL嗎?你通常如何查詢它?它有什麼客戶存在?然後有人知道Spark可能可以幫助你。 –