2016-09-28 35 views
1

我正在嘗試處理日誌文件,並將兩個不同位置的結果保存到幾乎相似,而無需再次處理整個日誌文件。一個數據源的兩個輸出

例如

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
    DataSource<Integer> ds = env.fromCollection(bigData()); 


    MapOperator<Integer, Integer> hardWorkDS = ds.map(i -> { 

     System.out.println("enter hard work"); 

     return hardWork(i); 
    }); 


    saveToDB(hardWorkDS.collect()); 
    saveToAnotherDB(hardWorkDS.map(i -> moreWork(i)).collect()); 

此代碼打印數據源中元素數量的兩倍「輸入艱苦工作」。 我知道這是應該如何工作的,因爲「collect()」會在每次調用時從一開始就評估整個數據。

有沒有解決方法,我可以做,以不處理相同的數據兩次?

我知道這是可能的流媒體,但我不能使用此流媒體。

回答

2

DataSet程序可以擁有儘可能多的數據接收器。只需添加一個或多個接收器DataSet.output(OutputFormat)並致電env.execute()啓動該程序。 Flink提供了一個JDBCOutputFormat,您可以使用它來將數據寫入數據庫。

正如你注意到的,你不應該使用collect(),因爲它會立即執行程序。除了防止多個數據接收器collect()的缺點是它在將數據寫入數據庫之前將數據提取到客戶端。直接從OutputFormat寫入數據是一個更具擴展性的解決方案。

相關問題