2017-02-15 37 views
1

我已成功集成代碼以將消息從事件集線器中取出並通過spark/spark-streaming處理它們。消息通過時,我正在轉向管理狀態。這是我正在使用的代碼,它大部分是https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.htmlSpark Streaming和Azure事件集線器mapWithState

本質上,它適用於一個虛擬源,它在單個分區上使用單個流但它不適用於工會化窗口流..雖然我可以爲每個分區創建一個流的多個實例它有點擊敗工會和窗口的重點.. +我試圖讓它工作的方式失敗。我有點卡住在何處走了..如果任何人有這將是盛大的任何想法的靈感..

val sparkSession = SparkSession.builder().master("local[2]").config(sparkConfiguration).getOrCreate() 

val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(10)) 
streamingContext.checkpoint(inputOptions.checkpointDir) 

//derive the stream and window 
val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters) 
val eventHubsWindowedStream = eventHubsStream.window(Seconds(10)) 

val initialRDD = sparkSession.sparkContext.parallelize(List(("dummy", 100L), ("source", 32L))) 
val stateSpec = StateSpec.function(trackStateFunc _) 
    .initialState(initialRDD) 
    .numPartitions(2) 
    .timeout(Seconds(60)) 

val eventStream = eventHubsWindowedStream 
    .map(messageStr => { 
    //parse teh event 
    var event = gson.fromJson(new String(messageStr), classOf[Event]) 

    //return a tuble of key/value pair 
    (event.product_id.toString, 1) 
    }) 

val eventStateStream = eventStream.mapWithState(stateSpec) 

val stateSnapshotStream = eventStateStream.stateSnapshots() 
stateSnapshotStream.print() 

stateSnapshotStream.foreachRDD { rdd => 
    import sparkSession.implicits._ 
    rdd.toDF("word", "count").registerTempTable("batch_word_count") 
} 

streamingContext.remember(Minutes(1)) 

streamingContext 
+0

*它不適用於聯合化窗口流*什麼不工作? –

+0

道歉,實質上是從未調用狀態函數。我無法調試到這一點。當我使用示例代碼的罰款..當我使用一個單一的流它的罰款..但不是當我使用unionized流或窗口.. –

+0

你嘗試在IDE本地調試嗎? –

回答

0

我解決我的問題,我結束了使用直接流和我的所有問題已經消失。我避免了這一點,因爲進度目錄只支持HDFS或ADL,現在我不能再在本地進行測試。

EventHubsUtils.createDirectStreams(的StreamingContext,inputOptions.namespace, inputOptions.hdfs,地圖(inputOptions.eventhub - > GetEventHubParams(inputOptions)))

儘管如此,工會流不工作..現在我只是需要弄清楚如何刪除HDFS中的進度目錄!

相關問題