2017-06-01 30 views
1

編輯2:跳到最後,狀態恢復,但它不是可查詢新的tl;博士「我如何使狀態是可查詢的,從檢查點還原後仍可查詢?託管鍵控狀態不恢復與檢查指向啓用

我有檢查鍵控流指着啓用類似這樣的(我已經在內存中嘗試過這種以及HDFS具有相同的結果)

env.enableCheckpointing(60000) 
env.setStateBackend(new FsStateBackend("file:///flink-test")) 
val stream = env.addSource(consumer) 
    .flatMap(new ValidationMap()).name("ValidationMap") 
    .keyBy(x => new Tuple3[String, String, String](x.account(), x.organization(), x.`type`())) 
    .flatMap(new Foo()).name(jobname) 

在這個流,我有一個託管鍵控狀態ValueState,我將其設置爲可查詢。

val newValueStateDescriptor = new ValueStateDescriptor[java.util.ArrayList[java.util.ArrayList[Long]]]("foo", classOf[java.util.ArrayList[java.util.ArrayList[Long]]]) 
    newValueStateDescriptor.setQueryable("foo") 

    valueState = getRuntimeContext.getState[java.util.ArrayList[java.util.ArrayList[Long]]](newValueStateDescriptor) 
    valueState.update(new java.util.ArrayList[java.util.ArrayList[Long]]()) 

該列表會定期追加或刪除,並更新valueState。當我提出可查詢狀態的請求時,我目前可以看到正確的值。

在我的JobManager日誌中,我看到檢查指向每一分鐘,當我檢查文件系統時,我看到非空的文件被創建。

我的設置有3個JobManagers(2個待機),3個TaskManagers(全部3個在使用)。

我把一個單一的數據點放入系統,並將其讀出QueryableState,一切看起來都不錯。然後我選擇一個單獨的TaskManager(甚至不是處理數據的那個,3箇中的任何一個),然後我殺死它,然後重新啓動它來模擬崩潰。

我看到作業重試2到3次,直到TaskManager重新聯機,最後我看到Flink中再次運行相同的JobID,生活似乎很好。

但是,我再次打到Queryable狀態,我得到一個UnknownKvStateLocation異常。

我真的不確定我在這裏做錯了什麼,事情似乎是檢查指向,但我從來沒有設法讓我的ValueState回來?也許它回來了,但不能查詢?

編輯:從JobManager 登錄片段暗示的東西都恢復

{"level":"INFO","time":"2017-06-01 15:30:02,332","class":"org.apache.flink.runtime.executiongraph.ExecutionGraph","ndc":"", "message":"Job Foo (dc7850a6866f181c2f07968d35fe3d46) switched from state RESTARTING to CREATED."} 
{"level":"INFO","time":"2017-06-01 15:30:02,332","class":"org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore","ndc":"", "message":"Recovering checkpoints from ZooKeeper."} 
{"level":"INFO","time":"2017-06-01 15:30:02,333","class":"org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore","ndc":"", "message":"Found 1 checkpoints in ZooKeeper."} 
{"level":"INFO","time":"2017-06-01 15:30:02,333","class":"org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore","ndc":"", "message":"Trying to retrieve checkpoint 5."} 
{"level":"INFO","time":"2017-06-01 15:30:02,340","class":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","ndc":"", "message":"Restoring from latest valid checkpoint: Checkpoint 5 @ 1496330912627 for dc7850a6866f181c2f07968d35fe3d46."} 
{"level":"INFO","time":"2017-06-01 15:30:02,340","class":"org.apache.flink.runtime.executiongraph.ExecutionGraph","ndc":"", "message":"Job Foo (dc7850a6866f181c2f07968d35fe3d46) switched from state CREATED to RUNNING."} 

真的看起來像它的恢復,當我檢查在/弗林克測試創建的文件我看到一些二進制數據,但它包含我的Queryable State ValueState的標識名稱。任何關於尋找什麼的想法都是值得歡迎的。

EDIT2:State> is < restored,it's just not queryable!

回答

0

事實上,給定的一塊註冊狀態已經被查詢並不是當前Flink在檢查點或保存點中記錄的內容的一部分。因此,在恢復之後,直到提供新的StateDescriptor之前,狀態纔可查詢。

欲瞭解更多信息,請參閱flink-users郵件列表中的this discussion