1
我正在使用Spark-streaming和RabbitMQ。因此,流式作業從RabbitMQ獲取數據並應用一些轉換和操作。所以,我想知道如何在同一個流上應用多個動作(即計算兩個不同的功能集)。可能嗎?如果是,如何將流對象傳遞給代碼中提到的多個類?如何在同一個Spark Streaming上運行多個操作
val config = ConfigFactory.parseFile(new File("SparkStreaming.conf"))
val conf = new SparkConf(true).setAppName(config.getString("AppName"))
conf.set("spark.cleaner.ttl", "120000")
val sparkConf = new SparkContext(conf)
val ssc = new StreamingContext(sparkConf, Seconds(config.getLong("SparkBatchInterval")))
val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("RealTimeQueueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("QueueExchangeName"), "routingKeys" -> config.getString("QueueRoutingKey"))
val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)
receiverStream.start()
如何從這裏處理流:
val objProcessFeatureSet1 = new ProcessFeatureSet1(Some_Streaming_Object)
val objProcessFeatureSet2 = new ProcessFeatureSet2(Some_Streaming_Object)
ssc.start()
ssc.awaitTermination()
如果它解決您的問題,可以請你接受它。 – Hokam
請注意,此解決方案將重新評估'JSONValue.parse'兩次,然後它將同時執行過濾器。在這種情況下,更好的方法是對條件進行分區並在分區後分叉進程。 –
Hi Brett,我們可以在使用地圖變換執行分析操作後接收到的「jsonStream」上使用persist運算符。 – Hokam