1

我正在使用Spark-streaming和RabbitMQ。因此,流式作業從RabbitMQ獲取數據並應用一些轉換和操作。所以,我想知道如何在同一個流上應用多個動作(即計算兩個不同的功能集)。可能嗎?如果是,如何將流對象傳遞給代碼中提到的多個類?如何在同一個Spark Streaming上運行多個操作

  val config = ConfigFactory.parseFile(new File("SparkStreaming.conf")) 
      val conf = new SparkConf(true).setAppName(config.getString("AppName")) 
      conf.set("spark.cleaner.ttl", "120000")   

      val sparkConf = new SparkContext(conf) 
      val ssc = new StreamingContext(sparkConf, Seconds(config.getLong("SparkBatchInterval"))) 

      val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("RealTimeQueueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("QueueExchangeName"), "routingKeys" -> config.getString("QueueRoutingKey")) 
      val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams) 
      receiverStream.start() 

如何從這裏處理流:

  val objProcessFeatureSet1 = new ProcessFeatureSet1(Some_Streaming_Object) 
      val objProcessFeatureSet2 = new ProcessFeatureSet2(Some_Streaming_Object) 

      ssc.start() 
      ssc.awaitTermination() 

回答

3

您可以運行在同一DSTREAM的多個動作,如下圖所示:以上

import net.minidev.json.JSONValue 
import net.minidev.json.JSONObject 

val config = ConfigFactory.parseFile(new File("SparkStreaming.conf")) 
val conf = new SparkConf(true).setAppName(config.getString("AppName")) 
conf.set("spark.cleaner.ttl", "120000")   

val sparkConf = new SparkContext(conf) 
val ssc = new StreamingContext(sparkConf, Seconds(config.getLong("SparkBatchInterval"))) 

val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("RealTimeQueueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("QueueExchangeName"), "routingKeys" -> config.getString("QueueRoutingKey")) 
val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams) 

val jsonStream = receiverStream.map(byteData => { 
    JSONValue.parse(byteData) 
}) 
jsonStream.filter(json => { 
    var customerType = json.get("customerType") 
    if(customerType.equals("consumer")) 
     true 
    else 
     false 
}).foreachRDD(rdd => { 
    rdd.foreach(json => { 
     println("json " + json) 
    }) 
}) 

jsonStream.filter(json => { 
    var customerType = json.get("customerType") 
    if(customerType.equals("non-consumer")) 
       true 
     else 
       false 
}).foreachRDD(rdd => { 
    rdd.foreach(json => { 
      println("json " + json) 
    }) 
}) 
ssc.start() 
ssc.awaitTermination() 

中的代碼片段,我首先創建從接收到的流中獲取jsonStream,然後根據客戶類型創建兩個不同的流,然後應用(foreachRDD)對他們採取行動來打印結果。

以類似的方式,您可以將相同的dstream傳遞給兩個不同的類,並在其中應用轉換和動作來計算不同的功能集。

我希望上面的解釋可以幫助您解決問題。

感謝,
Hokam

+0

如果它解決您的問題,可以請你接受它。 – Hokam

+0

請注意,此解決方案將重新評估'JSONValue.parse'兩次,然後它將同時執行過濾器。在這種情況下,更好的方法是對條件進行分區並在分區後分叉進程。 –

+0

Hi Brett,我們可以在使用地圖變換執行分析操作後接收到的「jsonStream」上使用persist運算符。 – Hokam

相關問題