棘手的部分是,你在分佈式系統中運行代碼,因此你的ParseData
函數的並行實例是相互獨立運行的。
您仍然可以通過使用ParseData
中的本地ID計數器來分配唯一ID。避免重複的技巧是正確的初始化和計數器增量。假設你有四個並行性,你會得到四個ParseData
實例(我們稱它們爲PD1 ... PD4
)。你會做以下ID分配:
PD1: 0, 4, 8, 12, ...
PD2: 1, 5, 9, 13, ...
PD3, 2, 6, 10, 14, ...
PD4: 3, 7, 11, 15, ...
你可以做到這一點,通過與不同的值(詳情如下)初始化並行實例,並通過您的並行遞增每個實例的數量(即ID += parallelism
)。
在Flink中,並行函數的所有實例都會自動分配一個唯一編號(所謂的任務索引)。你可以使用這個數字來初始化你的ID計數器。您可以通過RuntimeContext.getIndexOfThisSubtask()
獲取任務索引。您也可以通過RuntimeContext.getNumberOfParallelSubtasks()
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RuntimeContext.html
收到運營商/功能並行要獲得RuntimeContext
使用RichMapFunction
實現ParseData
和open()
調用getRuntimeContext()
。
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/functions/RichFunction.html
像這樣的東西(只顯示相關的方法):
class ParseDate extends RichMapFunction {
private long parallelism;
private long idCounter;
public void open(Configuration parameters) {
RuntimeContext ctx = getRuntimeContext();
parallelism = ctx.getNumberOfParallelSubtasks();
idCounter = ctx.getIndexOfThisSubtask();
}
public OutputDataType map(InputDataType value) {
OutputDataType output = new OutputDataType();
output.setID(idCounter);
idCounter += parallelism;
// further processing
return output;
}
}
謝謝,制定出適合我。我不得不添加'public void open(配置參數)'來使其工作。但是,這樣最後的ID不是連續的(在每次運行時它們被分配的方式不同),但我想這與分配給每個實例的元素的數量有關。 –
修復了我答案中的開放方法 - 感謝您指出。 是的,如果數據分佈不均勻,您可能無法獲得連續的ID,這將非常困難,因爲您需要共享的全局狀態(這可能會嚴重影響您的性能)。我忽略了你的問題中的這個細節。 –