我使用Flink 1.2.0 Table API來處理一些流數據。以下是我的代碼:Flink:執行計劃中的表API複製運算符
val dataTable = myDataStream
// table A
val tableA = dataTable
.window(Tumble over 5.minutes on 'rowtime as 'w)
.groupBy("w, group1, group2")
.select("w.start as time, group1, group2, data1.sum as data1, data2.sum as data2")
tableEnv.registerTable("tableA", tableA)
// table A sink
tableA.writeToSink(sinkTableA)
//...
// I shoul get some other outputs from TableA output
//...
val dataTable = tableEnv.ingest("tableA")
// table result1
val result1 = dataTable
.window(Tumble over 5.minutes on 'rowtime as 'w)
.groupBy("w, group1")
.select("w.start as time, group1, data1.sum as data1")
// result1 sink
result2.writeToSink(sinkResult1)
// table result2
val result2 = dataTable
.window(Tumble over 5.minutes on 'rowtime as 'w)
.groupBy("w, group2")
.select("w.start as time, group2, data2.sum as data1")
// result2 sink
result2.writeToSink(sinkResult2)
我等待在flink執行計劃中獲取此樹。 和我在其他Flink作業中的Flink Streaming一樣。
DataStream_Operators -> TableA_Operators -> TableA_Sink
|-> Result1_Operators -> Result1_Sink
|-> Result2_Operators -> Result2_Sink
但是,我得到這與3表的相同opertoprs副本!
DataStream_Operators -> TableA_Operators -> TableA_Sink
|-> Copy_of_TableA_Operators -> Result1_Operators -> Result1_Sink
|-> Copy_of_TableA_Operators -> Result2_Operators -> Result2_Sink
我對結果中的這項工作的大量輸入數據表現不佳。
我該如何解決這個問題並獲得最佳執行計劃?
我不確定,Flink Table API和SQL是什麼實驗性功能,並且 也許它會在下一個版本中修復。
將TableA結果翻譯爲「DataStream」時,我不能使用case類來處理'TableState'結果,因爲組和選擇字段是我作業中配置文件的可配置性。這是原因之一,爲什麼我使用Flink Table API而不是Flink Streaming,我可以在這裏使用case class來輸入和輸出數據。所以,我應該使用Row類來進行DataStream翻譯。但是,我不能使用'Row'來選擇'resul1'和'resul2'(組和選擇字段也可配置)! – SergeySA
如果我使用'Row'與'val resultStream = result.toDataStream [Row]; tableEnv.registerDataStream(「TableA」,resultStream)''resul1'選擇有錯誤:'線程中的異常「main」org.apache.flink.table.api.ValidationException:無法解析[group1]給定輸入[f0,f1, f2,f3,f4]'。我嘗試使用'row'與文件列表'tableEnv.registerDataStream(「TableA」,resultStream,字段)'並得到錯誤'線程中的異常「main」org.apache.flink.table.api.TableException:類型行的源(f0:時間戳,f1:字符串,f2:字符串,f3:長,f4:長)不能轉換爲表格。 – SergeySA