2016-06-10 58 views
0

我的代碼使用readTextFile讀取日誌文件,當我在Flink(/opt/flink-1.0.3/bin/flink run -m yarn-cluster -yn 2 /home/flink/flink-json-0.1.jar)中運行該jar時,它成功處理了裏面的行並停止了我的應用程序,而不是等待新行。 我需要一些參數來做它嗎?爲什麼flink停止我的流應用程序?

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val stream = env.readTextFile("hdfs:///test/ignicion.io") 

預先感謝您

回答

2

您正在尋找

StreamExecutionEnvironment.readFileStream(String filePath, long intervalMillis, WatchType watchType) 

對於WatchType您有下列選項

  • ONLY_NEW_FILES,
  • REPROCESS_WITH_APPENDED,
  • PROCESS_ONLY_APPENDED;

流從

StreamExecutionEnvironment.readTextFile(String filePath, String charsetName) 

會閱讀完所有的文件後完成。我認爲,它主要是在開發過程中進行本地測試。

+0

同樣的結果:'env.readFileStream(「hdfs:///test/ignicion.io」,100,FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)' – jag

+0

你是不是說它也會立即停止? – snntrable

+0

對不起,它的工作原理與你所寫的一樣... – jag

相關問題