2016-04-15 121 views
3

我想知道是否有可能在整個應用程序中保持完全不同的狀態?例如,第一個狀態的update function是否從第二個狀態調用?處理不同的狀態

我不記得經歷過任何這樣的例子,也沒有找到任何計數器指示......基於https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html的例子,我知道沒有理由爲什麼我不能有不同的trackStateFunc s與不同State s,進一步更新那些由於他們的Key,如下圖所示:

def firstTrackStateFunc(batchTime: Time, 
         key: String, 
         value: Option[Int], 
         state: State[Long]): Option[(String, Long)] = { 
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
    val output = (key, sum) 
    state.update(sum) 
    Some(output) 
} 

def secondTrackStateFunc(batchTime: Time, 
         key: String, 
         value: Option[Int], 
         state: State[Int]): Option[(String, Long)] = { 
    // disregard problems this example would cause 
    val dif = value.getOrElse(0) - state.getOption.getOrElse(0L) 
    val output = (key, dif) 
    state.update(dif) 
    Some(output) 
} 

我認爲這是可能的,但仍然不能確定。我希望有人驗證或無效這個假設...

+0

你想要輸入是相同的,但有能力更新一個狀態在另一個?或者你想從第一個狀態中獲取狀態,並用它來更新兩個狀態? –

+0

我不一定要輸入是相同的,但狀態是完全不同的(如一個字段狀態和多字段其他)。能夠在第一個狀態中更新第二個狀態會很好,但不是主要目的。實際上,我想知道是否可以在同一應用程序中更新完全不同的狀態(通過不同的更新功能)?它對你更清楚嗎? – wipman

+0

我這麼認爲。國家是孤立的,你不能在不同的'mapWithState'函數內在它們之間進行交互。你可以做的就是將這些狀態鏈接在一起,並將它們作爲值傳遞給下一個'mapWithState',但我不認爲這就是你想要做的。 –

回答

2

我想知道是否有可能在整個應用程序中保持完全不同的 狀態?

DStream[(Key, Value)]mapWithState每次調用可以容納一個State[T]對象。對於mapWithState的每個調用,此T需要相同。爲了使用不同的狀態,您可以鏈接mapWithState呼叫,其中的Option[U]是另一個輸入,或者您可以拆分DStream並將不同的mapWithState呼叫應用到每個呼叫。但是,您不能在另一個對象內調用另一個對象,因爲它們彼此隔離,而且不能改變另一個對象的狀態。

1

@Yuval給出了鏈式mapWithState函數的一個很好的答案。但是,我有另一種方法。而不是有兩個mapWithState調用,你可以把sum和diff放在同一個狀態[(Int,Int)]中。

在這種情況下,您只需要一個mapWithState函數,您可以在其中更新這兩件事情。就像這樣:

def trackStateFunc(batchTime: Time, 
        key: String, 
        value: Option[Int], 
        state: State[(Long, Int)]): Option[(String, (Long, Int))] = 
{ 
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
    val dif = value.getOrElse(0) - state.getOption.getOrElse(0L) 
    val output = (key, (sum, diff)) 
    state.update((sum, diff)) 
    Some(output) 
}