我正在嘗試爲Flink串流作業創建JUnit測試,該工作將數據寫入kafka主題,並分別使用FlinkKafkaProducer09
和FlinkKafkaConsumer09
從同一個kafka主題讀取數據。我的產品通過測試數據:如何停止程序中的閃存串流作業
DataStream<String> stream = env.fromElements("tom", "jerry", "bill");
以及檢查相同的數據從消費者來爲:
List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result = resultSink.getResult();
assertEquals(expected, result);
使用TestListResultSink
。
我能看到的數據從消費者未來通過打印流預期。但無法獲得Junit測試結果,因爲即使消息完成後,消費者仍會繼續運行。所以它沒有來測試部分。
在Flink
或FlinkKafkaConsumer09
中有什麼方法可以停止進程或運行特定時間?
Hi @TillRohrmann,謝謝你的答覆。我已經嘗試在處理完所有3個元素後在地圖函數中拋出一些異常。但JUnit測試在那種我不想要的情況下顯示失敗。如果你能向我展示一個例子,那將是一件好事。感謝進取! – Mike