2016-10-04 20 views
2

我正在關注sample of mapWithState function on Databricks website在Spark Streaming中使用mapWithState指定超時

爲trackstatefunction這些碼是如下:

def trackStateFunc(batchTime: Time, key: String, value: Option[Int], state: State[Long]): Option[(String, Long)] = { 
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
    val output = (key, sum) 
    state.update(sum) 
    Some(output) 
} 

我的情況下的問題,當狀態是時序出(state.isTimingout()==true)那麼函數再次更新其可能會導致異常的SATE。樣本是否屬實?

回答

3

在狀態爲超時的情況下(state.isTimingout() == true),則函數再次更新可能導致異常的狀態。

是的,這是正確的。如果您在mapWithState上設置了明確的超時時間,並且在狀態處於最後一次超時迭代時調用state.update,則會導致拋出異常,因爲一旦發生超時就無法更新狀態。這是明確表示in the documentation

國家不能被更新,如果它已經被刪除(即 remove()方法已經被調用),或者將被刪除,由於 超時(即,isTimingOut()是真的)。


在你的榜樣,一個額外的檢查是爲了:

def trackStateFunc(batchTime: Time, 
        key: String, 
        value: Option[Int], 
        state: State[Long]): Option[(String, Long)] = { 
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
    val output = (key, sum) 
    if (!state.isTimingOut) state.update(sum) 
    Some(output) 
} 

或者,由於value只應None一旦發生超時,您可以使用模式匹配,以及:

def trackStateFunc(batchTime: Time, 
        key: String, 
        value: Option[Int], 
        state: State[Long]): Option[(String, Long)] = { 
    value match { 
    case Some(v) => 
     val sum = v.toLong + state.getOption.getOrElse(0L) 
     state.update(sum) 
     Some((key, sum)) 
    case _ if state.isTimingOut() => (key, state.getOption.getOrElse(0L)) 
    } 
} 

有關有狀態數據流的審查,請參閱this blog post(免責聲明:我是作者)。

+0

hi @Yuval,所以如果一個特定的鍵超時,所有的狀態都消失了?你需要從頭開始? – marios

+1

@marios是的,超時後,鍵被標記爲刪除。 –

+0

我想如果你需要在超時後堅持你的狀態,你需要自己動手嗎?感謝Yuval! – marios

相關問題