2016-07-27 19 views
0

我正在加載CSV文件並使用自定義地圖功能將每一行轉換爲POJO。對於我的程序邏輯,我需要爲每個POJO提供一個從0到n(其中n爲總行數)的唯一ID。我的問題是,我可以使用轉換函數爲每個POJO分配一個唯一的ID(例如初始行號)嗎?理想的方法是在UDF中獲得一個Iterable,並在迭代輸入元組的同時增加一個變量,最後輸出相應的POJO。我的代碼目前如下所示:Apache Flink - 爲輸入分配唯一的ID

DataSet<MyType> input = env.readCsvFile("/path/file.csv") 
    .includeFields("1111") 
    .types(String.class, Double.class, Double.class,Double.class) 
    .map(new ParseData()); 

其中ParseData將元組轉換爲MyType POJO。

是否有任何實現此任務的最佳做法?

回答

2

棘手的部分是,你在分佈式系統中運行代碼,因此你的ParseData函數的並行實例是相互獨立運行的。

您仍然可以通過使用ParseData中的本地ID計數器來分配唯一ID。避免重複的技巧是正確的初始化和計數器增量。假設你有四個並行性,你會得到四個ParseData實例(我們稱它們爲PD1 ... PD4)。你會做以下ID分配:

PD1: 0, 4, 8, 12, ... 
PD2: 1, 5, 9, 13, ... 
PD3, 2, 6, 10, 14, ... 
PD4: 3, 7, 11, 15, ... 

你可以做到這一點,通過與不同的值(詳情如下)初始化並行實例,並通過您的並行遞增每個實例的數量(即ID += parallelism)。

在Flink中,並行函數的所有實例都會自動分配一個唯一編號(所謂的任務索引)。你可以使用這個數字來初始化你的ID計數器。您可以通過RuntimeContext.getIndexOfThisSubtask()獲取任務索引。您也可以通過RuntimeContext.getNumberOfParallelSubtasks()

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RuntimeContext.html

收到運營商/功能並行要獲得RuntimeContext使用RichMapFunction實現ParseDataopen()調用getRuntimeContext()

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichFunction.html

像這樣的東西(只顯示相關的方法):

class ParseDate extends RichMapFunction { 
    private long parallelism; 
    private long idCounter; 

    public void open(Configuration parameters) { 
     RuntimeContext ctx = getRuntimeContext(); 
     parallelism = ctx.getNumberOfParallelSubtasks(); 
     idCounter = ctx.getIndexOfThisSubtask(); 
    } 

    public OutputDataType map(InputDataType value) { 
     OutputDataType output = new OutputDataType(); 
     output.setID(idCounter); 
     idCounter += parallelism; 
     // further processing 
     return output; 
    } 
} 
+0

謝謝,制定出適合我。我不得不添加'public void open(配置參數)'來使其工作。但是,這樣最後的ID不是連續的(在每次運行時它們被分配的方式不同),但我想這與分配給每個實例的元素的數量有關。 –

+0

修復了我答案中的開放方法 - 感謝您指出。 是的,如果數據分佈不均勻,您可能無法獲得連續的ID,這將非常困難,因爲您需要共享的全局狀態(這可能會嚴重影響您的性能)。我忽略了你的問題中的這個細節。 –