2017-05-26 16 views
0

我使用Flink 1.2.0 Table API來處理一些流數據。以下是我的代碼:Flink:執行計劃中的表API複製運算符

val dataTable = myDataStream 
// table A 
val tableA = dataTable 
    .window(Tumble over 5.minutes on 'rowtime as 'w) 
    .groupBy("w, group1, group2") 
    .select("w.start as time, group1, group2, data1.sum as data1, data2.sum as data2") 
tableEnv.registerTable("tableA", tableA) 
// table A sink 
tableA.writeToSink(sinkTableA) 
//... 
// I shoul get some other outputs from TableA output 
//... 
val dataTable = tableEnv.ingest("tableA") 
// table result1 
val result1 = dataTable 
.window(Tumble over 5.minutes on 'rowtime as 'w) 
.groupBy("w, group1") 
.select("w.start as time, group1, data1.sum as data1") 
// result1 sink 
result2.writeToSink(sinkResult1) 
// table result2 
val result2 = dataTable 
.window(Tumble over 5.minutes on 'rowtime as 'w) 
.groupBy("w, group2") 
.select("w.start as time, group2, data2.sum as data1") 
// result2 sink 
result2.writeToSink(sinkResult2) 

我等待在flink執行計劃中獲取此樹。 和我在其他Flink作業中的Flink Streaming一樣。

DataStream_Operators -> TableA_Operators -> TableA_Sink 
             |-> Result1_Operators -> Result1_Sink 
             |-> Result2_Operators -> Result2_Sink 

但是,我得到這與3表的相同opertoprs副本!

DataStream_Operators -> TableA_Operators -> TableA_Sink 
        |-> Copy_of_TableA_Operators -> Result1_Operators -> Result1_Sink 
        |-> Copy_of_TableA_Operators -> Result2_Operators -> Result2_Sink 

我對結果中的這項工作的大量輸入數據表現不佳。

我該如何解決這個問題並獲得最佳執行計劃?

我不確定,Flink Table API和SQL是什麼實驗性功能,並且 也許它會在下一個版本中修復。

回答

0

在當前狀態下,只要您將Table轉換爲DataSetDataStream或將其寫入TableSink,Table API就會轉換整個查詢。在你的程序中,你會調用三次writeToSink這意味着每次翻譯完整的查詢。

但是完整的查詢是什麼?在Table上應用了所有Table API運算符。當您在TableEnvironment中註冊Table時,它基本上被註冊爲一個視圖,即只有其定義(所有定義該表的操作符)都被註冊。因此,第二次和第三次致電writeToSink時,這些操作員會再次翻譯。

,如果你不是爲它註冊爲Table翻譯tableADataStream並註冊在TableEnvironmentDataStream就可以解決這個問題。這看起來如下:

val tableA = ... 
val streamA = tableA.toDataStream[X] // X should be a case class for rows of tableA 
val tableEnv.registerDataStream("tableA", streamA) 

tableEnv.ingest("tableA").writeToSink(sinkTableA) // emit tableA by ingesting the registered DataStream 

我知道,這是不是很方便,但此刻避免重複翻譯表的唯一方法。

+0

將TableA結果翻譯爲「DataStream」時,我不能使用case類來處理'TableState'結果,因爲組和選擇字段是我作業中配置文件的可配置性。這是原因之一,爲什麼我使用Flink Table API而不是Flink Streaming,我可以在這裏使用case class來輸入和輸出數據。所以,我應該使用Row類來進行DataStream翻譯。但是,我不能使用'Row'來選擇'resul1'和'resul2'(組和選擇字段也可配置)! – SergeySA

+0

如果我使用'Row'與'val resultStream = result.toDataStream [Row]; tableEnv.registerDataStream(「TableA」,resultStream)''resul1'選擇有錯誤:'線程中的異常「main」org.apache.flink.table.api.ValidationException:無法解析[group1]給定輸入[f0,f1, f2,f3,f4]'。我嘗試使用'row'與文件列表'tableEnv.registerDataStream(「TableA」,resultStream,字段)'並得到錯誤'線程中的異常「main」org.apache.flink.table.api.TableException:類型行的源(f0:時間戳,f1:字符串,f2:字符串,f3:長,f4:長)不能轉換爲表格。 – SergeySA