我想在perKey基礎上遍歷KV pCollection的值。我用下面的代碼中使用自定義類結合起來,在使用CombineFn累加來自所有節點的數據後,合併每個鍵的所有值
PCollection<KV<String, String>> combinesAttributes =
valExtract.get(extAttUsers).apply(Combine.<String, String>perKey(
new CombineAttributes()));
,下面將我的自定義組合類,
public static class CombineAttributes implements SerializableFunction<Iterable<String>, String> {
@Override
public String apply(Iterable<String> input) {...}..}
這對於小本投入,但對於大投入並沒有如預期聯合收割機正常工作。輸出結果只有一個鍵的值很少,其他的則沒有。我假定輸出只有來自一個節點的組合數據。
https://cloud.google.com/dataflow/model/combine中的文檔提到了使用CombineFn以便在所有節點中合併每個鍵的完整收集值。
但是,當我改變了自定義如下組合函數,我收到以下錯誤,
incompatible types: CombineAttributes cannot be converted to com.google.cloud.dataflow.sdk.transforms.SerializableFunction<java.lang.Iterable<java.lang.String>,java.lang.String>
合併功能
public static class CombineAttributes extends CombineFn<Iterable<String>, CombineAttributes.Accum, String> {
public static class Accum {
List<String> inputList = new ArrayList<String>();
}
public Accum createAccumulator() { return new Accum(); }
public Accum addInput(Accum accum, Iterable<String> input) {
for (String item : input) {
accum.inputList.add(item);
}
return accum;
}
public Accum mergeAccumulators(Iterable<Accum> accums) {
Accum merged = createAccumulator();
for (Accum accum : accums) {
for (String item : accum.inputList) {
merged.inputList.add(item);
}
}
return merged;
}
public String extractOutput(Accum accum) {
return "";
}
}
有沒有示例代碼可用於聯合收割機perKey延長CombineFn
。請讓我知道上面的代碼有什麼問題。