2016-01-06 151 views
2

我有以下類型的示例數據。使用DataFlow的總和平均聚合

s.n., time, user, time_span, user_level 
1, 2016-01-04T1:26:13, Hari, 8, admin 
2, 2016-01-04T11:6:13, Gita, 2, admin 
3, 2016-01-04T11:26:13, Gita, 0, user 

現在我需要找到average_time_span/useraverage_time_span/user_leveltotal_time_span/user

我可以找到上面提到的每個值,但無法一次找到所有這些值。由於我是DataFlow的新手,請爲我推薦適當的方法。

static class ExtractUserAndUserLevelFn extends DoFn<String, KV<String, Long>> { 
     @Override 
     public void processElement(ProcessContext c) { 

      String[] words = c.element().split(","); 

      if (words.length == 5) { 
       Instant timestamp = Instant.parse(words[1].trim());      
       KV<String, Long> userTime = KV.of(words[2].trim(), Long.valueOf(words[3].trim())); 
       KV<String, Long> userLevelTime = KV.of(words[4].trim(), Long.valueOf(words[3].trim()));      
       c.outputWithTimestamp(userTime, timestamp); 
       c.outputWithTimestamp(userLevelTime, timestamp); 

      } 
     } 
    } 


public static void main(String[] args) { 
    TestOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() 
      .as(TestOptions.class); 
    Pipeline p = Pipeline.create(options); 
    p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) 
      .apply(ParDo.of(new ExtractUserAndUserLevelFn())) 
      .apply(Window.<KV<String, Long>>into(
        FixedWindows.of(Duration.standardSeconds(options.getMyWindowSize())))) 
      .apply(GroupByKey.<String, Long>create()) 
      .apply(ParDo.of(new DoFn<KV<String, Iterable<Long>>, KV<String, Long>>() { 
       public void processElement(ProcessContext c) { 
        String key = c.element().getKey(); 
        Iterable<Long> docsWithThatUrl = c.element().getValue(); 
        Long sum = 0L; 
        for (Long item : docsWithThatUrl) 
         sum += item; 
        KV<String, Long> userTime = KV.of(key, sum); 
        c.output(userTime); 
       } 
      })) 
      .apply(MapElements.via(new FormatAsTextFn())) 
      .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()). 
        withNumShards(options.getShardsNumber())); 

    p.run(); 
} 

回答

2

一種方法是將線第一解析成每行包含一個PCollection,並從該集合創建鍵值對兩個PCollection。比方說,你定義表示這樣一行一條記錄:

static class Record implements Serializable { 
    final String user; 
    final String role; 
    final long duration; 
    // need a constructor here 
} 

現在,創建一個從輸入線創建唱片LineToRecordFn,這樣就可以做到:

PCollection<Record> records = p.apply(TextIO.Read.named("ReadLines") 
           .from(options.getInputFile())) 
           .apply(ParDo.of(new LineToRecordFn())); 

你可以窗口在這裏,如果你想。不管你的窗口或沒有,你可以創建你鍵入的通過角色和鍵控通過用戶PCollections:

PCollection<KV<String,Long>> role_duration = records.apply(MapElements.via(
    new SimpleFunction<Record,KV<String,Long>>() { 
      @Override 
      public KV<String,Long> apply(Record r) { 
      return KV.of(r.role,r.duration); 
      } 
     })); 

PCollection<KV<String,Long>> user_duration = records.apply(MapElements.via(
    new SimpleFunction<Record,KV<String,Long>>() { 
       @Override 
       public KV<String,Long> apply(Record r) { 
       return KV.of(r.user, r.duration); 
       } 
      })); 

現在,你可以得到的手段和金額在短短的幾行字:

PCollection<KV<String,Double>> mean_by_user = user_duration.apply(
    Mean.<String,Long>perKey()); 
PCollection<KV<String,Double>> mean_by_role = role_duration.apply(
    Mean.<String,Long>perKey()); 
PCollection<KV<String,Long>> sum_by_role = role_duration.apply(
    Sum.<String>longsPerKey()); 

請注意,數據流在運行作業之前會進行一些優化。所以,雖然它可能看起來像是在記錄PCollection上做了兩遍,但這可能並非如此。

1

MeanSum變換樣子,他們會爲這種使用情況運行良好。基本用法是這樣的:

PCollection<KV<String, Double>> meanPerKey = 
    input.apply(Mean.<String, Integer>perKey()); 

PCollection<KV<String, Integer>> sumPerKey = input 
    .apply(Sum.<String>integersPerKey()); 
+0

但我需要找到不同列的平均值和不同列值的平均值。如何在單個程序中做到這一點。 – Lionel

+1

你會想把它們分別處理成一個單獨的PCollection,從原來的PCollection中分支出來。 –

+0

我可以使用[sideOutput](https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/PDoDo)進行此操作。 – Lionel