我會說,這樣做的正確方法是寫自己的DOFN和GroupByKey改造後使用它:
static class CountAndMean extends DoFn<KV<String, Iterator<Double>>, String> {
@Override
public void processElement(ProcessContext c) {
long count = 0L;
double sum = 0.0;
for(Double v: c.element().getValue()){
sum += v.doubleValue();
count += 1L;
}
double mean = sum/count;
String out = c.element().getKey() + "," + String.valueOf(mean) + "," + String.valueOf(count);
c.output(out);
}
PCollection<KV<String, Double>> inCol = ... ;
PCollection<KV<String, Iterable<Double>>> perKeyCol = inCol.apply(GroupByKey.<String, Double>create());
PCollection<String> outCol = perKeyCol.apply(ParDo.named("CountAndMean").of(new CountAndMean()));
但不會計數操作返回1(因爲所有的鍵現在都是不同的)? –
Oups,這裏的確是另一種基於GroupByKey的解決方案。 – Quentin
謝謝 - 這也是我想到的。我希望我可以重複使用「Count」和「Mean」操作。 –