2016-04-21 30 views
1

假設我具有形式(每行一個事件)的一個文件:如何通過Apache Flink的屬性和時間窗口來計數?

Source,Timestamp aa,2014-05-02 22:12:11 bb,2014-05-02 22:22:11

我想總結通過源與5分鐘的連續時間窗分組事件的數量。我如何用Flink做到這一點?

我有現在的問題是:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
    DataStreamSource<Event> stream = env.fromCollection(new EventFileReader(new File("path/to/file")), Event.class); 

    stream 
     .keyBy("getSource()") 
     .timeWindow(Time.minutes(5)) 
     .sum("getTimestamp()");  

    env.execute(); 

public class Event { 
    private final String source; 
    private final long timestamp; 

    public Event(String source, long timestamp) { 
     this.source = source; 
     this.timestamp = timestamp; 
    } 

    public String getSource() { 
     return source; 
    } 

    public long getTimestamp() { 
     return timestamp; 
    } 
} 

我錯過了兩件事情。首先,這失敗了,並說Event類不是POJO。其次,我不知道如何計算窗口中的事件數量。現在我正在使用.sum("getTimestamp()"),但我確定不是這樣。有什麼想法嗎?

回答

1

我會推薦使用fold函數來進行窗口聚合。以下代碼片段應該完成這項工作:

public class Job { 
    public static void main(String[] args) throws Exception { 
     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
     DataStream<Event> stream = env.fromElements(new Event("a", 1), new Event("b", 2), new Event("a", 2)).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Event>() { 
      @Nullable 
      @Override 
      public Watermark checkAndGetNextWatermark(Event event, long l) { 
       return new Watermark(l); 
      } 

      @Override 
      public long extractTimestamp(Event event, long l) { 
       return event.getTimestamp(); 
      } 
     }); 

     DataStream<Tuple2<String, Integer>> count = stream.keyBy(new KeySelector<Event, String>() { 
       @Override 
       public String getKey(Event event) throws Exception { 
        return event.getSource(); 
       } 
      }) 
      .timeWindow(Time.minutes(5)) 
      .fold(Tuple2.of("", 0), new FoldFunction<Event, Tuple2<String, Integer>>() { 
       @Override 
       public Tuple2<String, Integer> fold(Tuple2<String, Integer> acc, Event o) throws Exception { 
        return Tuple2.of(o.getSource(), acc.f1 + 1); 
       } 
      }); 

     count.print(); 

     env.execute(); 
    } 

    public static class Event { 
     private final String source; 
     private final long timestamp; 

     public Event(String source, long timestamp) { 
      this.source = source; 
      this.timestamp = timestamp; 
     } 

     public String getSource() { 
      return source; 
     } 

     public long getTimestamp() { 
      return timestamp; 
     } 
    } 
} 
相關問題