2015-05-25 49 views
13

我正在深入研究Websphere MQ作爲Spark流的數據源的可能性,因爲它是我們的一個用例所需要的。 我知道MQTT是支持來自MQ數據結構的通信的協議,但由於我是一個引發流媒體的新手,我需要一些相同的工作示例。 有沒有人嘗試用火花流連接MQ。請設計出這樣做的最佳方式。Websphere MQ作爲Apache Spark流的數據源

+1

投票關閉爲因爲它不適合Stack Overflow的問題指南。我建議在http://mqseries.net或其他在線MQ論壇上提出這些廣泛的體系結構和可行性問題。 –

+0

我認爲這可能只是一個措辭問題。而不是模糊的「我正在研究這件事情,最好的解決方案是什麼?」你可以直接提出問題。 _「如何通過Apache Spark從Websphere MQ讀取數據?」_如果您對該問題的Websphere MQ更加了解,則可以添加更多關於該問題的信息。它支持SQL嗎?你通常如何查詢它?它有什麼客戶存在?然後有人知道Spark可能可以幫助你。 –

回答

3

所以,我在這裏張貼了CustomMQReceiver其連接WebSphere MQ和讀取工作代碼數據:

public class CustomMQReciever extends Receiver<String> { String host = null; 
int port = -1; 
String qm=null; 
String qn=null; 
String channel=null; 
transient Gson gson=new Gson(); 
transient MQQueueConnection qCon= null; 

Enumeration enumeration =null; 

public CustomMQReciever(String host , int port, String qm, String channel, String qn) { 
    super(StorageLevel.MEMORY_ONLY_2()); 
    this.host = host; 
    this.port = port; 
    this.qm=qm; 
    this.qn=qn; 
    this.channel=channel; 

} 

public void onStart() { 
    // Start the thread that receives data over a connection 
    new Thread() { 
     @Override public void run() { 
      try { 
       initConnection(); 
       receive(); 
      } 
      catch (JMSException ex) 
      { 
       ex.printStackTrace(); 
      } 
     } 
    }.start(); 
} 
public void onStop() { 
    // There is nothing much to do as the thread calling receive() 
    // is designed to stop by itself isStopped() returns false 
} 

/** Create a MQ connection and receive data until receiver is stopped */ 
private void receive() { 
    System.out.print("Started receiving messages from MQ"); 

    try { 

    JMSMessage receivedMessage= null; 

     while (!isStopped() && enumeration.hasMoreElements()) 
     { 

      receivedMessage= (JMSMessage) enumeration.nextElement(); 
      String userInput = convertStreamToString(receivedMessage); 
      //System.out.println("Received data :'" + userInput + "'"); 
      store(userInput); 
     } 

     // Restart in an attempt to connect again when server is active again 
     //restart("Trying to connect again"); 

     stop("No More Messages To read !"); 
     qCon.close(); 
     System.out.println("Queue Connection is Closed"); 

    } 
    catch(Exception e) 
    { 
     e.printStackTrace(); 
     restart("Trying to connect again"); 
    } 
    catch(Throwable t) { 
     // restart if there is any other error 
     restart("Error receiving data", t); 
    } 
    } 

    public void initConnection() throws JMSException 
{ 
    MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory(); 
    conFactory.setHostName(host); 
    conFactory.setPort(port); 
    conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP); 
    conFactory.setQueueManager(qm); 
    conFactory.setChannel(channel); 


    qCon= (MQQueueConnection) conFactory.createQueueConnection(); 
    MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1); 
    MQQueue queue=(MQQueue) qSession.createQueue(qn); 
    MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue); 
    qCon.start(); 

    enumeration= browser.getEnumeration(); 
    } 

@Override 
public StorageLevel storageLevel() { 
    return StorageLevel.MEMORY_ONLY_2(); 
} 
} 
1

我相信你可以使用JMS連接來連接的WebSphere MQ和Apache駱駝可以使用連接到Websphere MQ。您可以創建自定義接收器像這樣(請注意,這種模式也沒有JMS使用):

class JMSReceiver(topicName: String, cf: String, jndiProviderURL: String) 
    extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) with Serializable { 
    //Transient as this will get passed to the Workers from the Driver 
    @transient 
    var camelContextOption: Option[DefaultCamelContext] = None 

    def onStart() = { 
    camelContextOption = Some(new DefaultCamelContext()) 
    val camelContext = camelContextOption.get 
    val env = new Properties() 
    env.setProperty("java.naming.factory.initial", "???") 
    env.setProperty("java.naming.provider.url", jndiProviderURL) 
    env.setProperty("com.webmethods.jms.clientIDSharing", "true") 
    val namingContext = new InitialContext(env); //using the properties file to create context 

    //Lookup Connection Factory 
    val connectionFactory = namingContext.lookup(cf).asInstanceOf[javax.jms.ConnectionFactory] 
    camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)) 

    val builder = new RouteBuilder() { 
     def configure() = { 
      from(s"jms://topic:$topicName?jmsMessageType=Object&clientId=$clientId&durableSubscriptionName=${topicName}_SparkDurable&maxConcurrentConsumers=10") 
      .process(new Processor() { 
      def process(exchange: Exchange) = { 
       exchange.getIn.getBody match { 
       case s: String => store(s) 
       } 
      } 
      }) 
     } 
     } 
    } 
    builders.foreach(camelContext.addRoutes) 
    camelContext.start() 
    } 

    def onStop() = if(camelContextOption.isDefined) camelContextOption.get.stop() 
} 

然後,您可以創建活動的DSTREAM像這樣:

val myDStream = ssc.receiverStream(new JMSReceiver("MyTopic", "MyContextFactory", "MyJNDI"))