2017-07-25 115 views
1

我想流csv文件並使用flink執行sql操作。但我寫的代碼只讀取一次並停止。它不流。在此先感謝,Flink CsvTableSource Streaming

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env); 

CsvTableSource csvtable = CsvTableSource.builder() 
    .path("D:/employee.csv") 
    .ignoreFirstLine() 
    .fieldDelimiter(",") 
    .field("id", Types.INT()) 
    .field("name", Types.STRING()) 
    .field("designation", Types.STRING()) 
    .field("age", Types.INT()) 
    .field("location", Types.STRING()) 
    .build(); 

tableEnv.registerTableSource("employee", csvtable); 

Table table = tableEnv.scan("employee").where("name='jay'").select("id,name,location"); 
//Table table1 = tableEnv.scan("employee").where("age > 23").select("id,name,age,location"); 

DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class); 

//DataStream<Row> stream1 = tableEnv.toAppendStream(table1, Row.class); 

stream.print(); 
//stream1.print(); 

env.execute(); 

回答

2

CsvTableSource是基於FileInputFormat其讀取並分析由行引用的文件一致。結果行被轉發到流式查詢中。所以在CsvTableSource是流的意思,行不斷讀取和轉發。但是,CsvTableSource終止於文件的末尾。因此,它發出一個有界的流。

我認爲您期望的行爲是CsvTableSource直到它結束讀取文件,然後等待將寫入追加到文件。 但是,這不是CsvTableSource的工作原理。您需要爲此自定義TableSource

+0

感謝您的信息@Fabian Hueske –

相關問題