2015-05-20 62 views
4

我在Spark Streaming應用程序中使用updateStateByKey函數來保存並更新每個密鑰的狀態。問題是我想知道更新函數裏面的「key」Spark Streaming - 如何獲取updateStateByKey函數中的「密鑰」

input.updateStateByKey(updateStateByKeyOfUsers) 

def updateStateByKeyOfUsers(newUsers: Seq[Set[String]], 
          userStatus: Option[(#####)] 
          ): Option[(#####)] = { 
    //How to get the "Key" 
} 

-Tao

+2

你不覺得如果你分享了'input'的類型會有幫助嗎?基於[這個例子](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala#L74)我認爲關鍵是第55行中的「t._1」,儘管你的方法簽名看起來非常不同。它是否編譯? –

+0

是的,這個工程。謝謝你的幫助。 –

回答

4

一般來說,星火API不會讓你獲得一個關鍵。這很令人傷心。您有兩種選擇:在每個輸入中包含密鑰或將其包含在一個狀態中。

相關問題