我有以下類型的示例數據。使用DataFlow的總和平均聚合
s.n., time, user, time_span, user_level
1, 2016-01-04T1:26:13, Hari, 8, admin
2, 2016-01-04T11:6:13, Gita, 2, admin
3, 2016-01-04T11:26:13, Gita, 0, user
現在我需要找到average_time_span/user
,average_time_span/user_level
和total_time_span/user
。
我可以找到上面提到的每個值,但無法一次找到所有這些值。由於我是DataFlow的新手,請爲我推薦適當的方法。
static class ExtractUserAndUserLevelFn extends DoFn<String, KV<String, Long>> {
@Override
public void processElement(ProcessContext c) {
String[] words = c.element().split(",");
if (words.length == 5) {
Instant timestamp = Instant.parse(words[1].trim());
KV<String, Long> userTime = KV.of(words[2].trim(), Long.valueOf(words[3].trim()));
KV<String, Long> userLevelTime = KV.of(words[4].trim(), Long.valueOf(words[3].trim()));
c.outputWithTimestamp(userTime, timestamp);
c.outputWithTimestamp(userLevelTime, timestamp);
}
}
}
public static void main(String[] args) {
TestOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(TestOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
.apply(ParDo.of(new ExtractUserAndUserLevelFn()))
.apply(Window.<KV<String, Long>>into(
FixedWindows.of(Duration.standardSeconds(options.getMyWindowSize()))))
.apply(GroupByKey.<String, Long>create())
.apply(ParDo.of(new DoFn<KV<String, Iterable<Long>>, KV<String, Long>>() {
public void processElement(ProcessContext c) {
String key = c.element().getKey();
Iterable<Long> docsWithThatUrl = c.element().getValue();
Long sum = 0L;
for (Long item : docsWithThatUrl)
sum += item;
KV<String, Long> userTime = KV.of(key, sum);
c.output(userTime);
}
}))
.apply(MapElements.via(new FormatAsTextFn()))
.apply(TextIO.Write.named("WriteCounts").to(options.getOutput()).
withNumShards(options.getShardsNumber()));
p.run();
}
但我需要找到不同列的平均值和不同列值的平均值。如何在單個程序中做到這一點。 – Lionel
你會想把它們分別處理成一個單獨的PCollection,從原來的PCollection中分支出來。 –
我可以使用[sideOutput](https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/PDoDo)進行此操作。 – Lionel