2017-06-08 51 views
3

我正在嘗試爲Flink串流作業創建JUnit測試,該工作將數據寫入kafka主題,並分別使用FlinkKafkaProducer09FlinkKafkaConsumer09從同一個kafka主題讀取數據。我的產品通過測試數據:如何停止程序中的閃存串流作業

DataStream<String> stream = env.fromElements("tom", "jerry", "bill"); 

以及檢查相同的數據從消費者來爲:

List<String> expected = Arrays.asList("tom", "jerry", "bill"); 
List<String> result = resultSink.getResult(); 
assertEquals(expected, result); 

使用TestListResultSink

我能看到的數據從消費者未來通過打印流預期。但無法獲得Junit測試結果,因爲即使消息完成後,消費者仍會繼續運行。所以它沒有來測試部分。

FlinkFlinkKafkaConsumer09中有什麼方法可以停止進程或運行特定時間?

回答

3

根本的問題是,流媒體程序通常不是有限和無限地運行。

最好的辦法,至少就目前而言,是插入一個特殊的控制消息到您的流,它可以讓源正確地終止(簡單地停止而使讀循環讀取更多的數據)。這樣Flink就會告訴所有下游運營商,他們在消耗完所有數據之後可以停止。

或者,你可以在你的源拋出一個特殊的異常(例如一段時間後),這樣你可以(通過檢查錯誤原因)區分故障情況下的「正確」的終結。在源中拋出異常將導致程序失敗。

+0

Hi @TillRohrmann,謝謝你的答覆。我已經嘗試在處理完所有3個元素後在地圖函數中拋出一些異常。但JUnit測試在那種我不想要的情況下顯示失敗。如果你能向我展示一個例子,那將是一件好事。感謝進取! – Mike