2014-12-02 39 views
2

我正在使用火花流式傳輸來消耗kafka消息。我想從kafka獲取一些消息,而不是閱讀所有消息。所以我想讀一批消息,將它們返回給調用者並停止火花流。目前我正在傳遞batchInterval時間在spark流上下文方法的awaitTermination方法中。我現在不知道如何將處理後的數據從火花流傳回給調用者。這裏是我的代碼,我使用目前在讀取第一批數據後停止火花流式傳輸

def getsample(params: scala.collection.immutable.Map[String, String]): Unit = { 
    if (params.contains("zookeeperQourum")) 
     zkQuorum = params.get("zookeeperQourum").get 
    if (params.contains("userGroup")) 
     group = params.get("userGroup").get 
    if (params.contains("topics")) 
     topics = params.get("topics").get 
    if (params.contains("numberOfThreads")) 
     numThreads = params.get("numberOfThreads").get 
    if (params.contains("sink")) 
     sink = params.get("sink").get 
    if (params.contains("batchInterval")) 
     interval = params.get("batchInterval").get.toInt 
    val sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077") 
    val ssc = new StreamingContext(sparkConf, Seconds(interval)) 
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap 
    var consumerConfig = scala.collection.immutable.Map.empty[String, String] 
    consumerConfig += ("auto.offset.reset" -> "smallest") 
    consumerConfig += ("zookeeper.connect" -> zkQuorum) 
    consumerConfig += ("group.id" -> group) 
    var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, StorageLevel.MEMORY_ONLY).map(_._2) 
    val streams = data.window(Seconds(interval), Seconds(interval)).map(x => new String(x)) 
    streams.foreach(rdd => rdd.foreachPartition(itr => { 
     while (itr.hasNext && size >= 0) { 
     var msg=itr.next 
     println(msg) 
     sample.append(msg) 
     sample.append("\n") 
     size -= 1 
     } 
    })) 
    ssc.start() 
    ssc.awaitTermination(5000) 
    ssc.stop(true) 
    } 

因此,而不是在所謂的「樣品」的字符串生成器保存郵件的我想返回到調用者。

回答

3

您可以實現StreamingListener,然後裏面,onBatchCompleted你可以調用ssc.stop()

private class MyJobListener(ssc: StreamingContext) extends StreamingListener { 

    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized { 

    ssc.stop(true) 

    } 

} 

這是你如何附上您SparkStreaming到JobListener:

val listen = new MyJobListener(ssc) 
ssc.addStreamingListener(listen) 

ssc.start() 
ssc.awaitTermination() 
+1

火花1.6 .1,嘗試使用解決方案時出現以下異常:'org.apache.spark.SparkException:無法停止AsynchronousListenerBus'偵聽器線程中的StreamingContext。任何想法如何解決這一問題? – pederpansen 2016-11-29 12:57:05

0

我們可以使用下面的代碼段

var sampleMessages=streams.repartition(1).mapPartitions(x=>x.take(10)) 

獲得樣本消息,如果我們想要第一批後停止那麼我們就應該執行我們自己的StreamingListener接口,並應停止在onBatchCompleted方法流。