1
假設我具有形式(每行一個事件)的一個文件:如何通過Apache Flink的屬性和時間窗口來計數?
Source,Timestamp aa,2014-05-02 22:12:11 bb,2014-05-02 22:22:11
我想總結通過源與5分鐘的連續時間窗分組事件的數量。我如何用Flink做到這一點?
我有現在的問題是:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Event> stream = env.fromCollection(new EventFileReader(new File("path/to/file")), Event.class);
stream
.keyBy("getSource()")
.timeWindow(Time.minutes(5))
.sum("getTimestamp()");
env.execute();
public class Event {
private final String source;
private final long timestamp;
public Event(String source, long timestamp) {
this.source = source;
this.timestamp = timestamp;
}
public String getSource() {
return source;
}
public long getTimestamp() {
return timestamp;
}
}
我錯過了兩件事情。首先,這失敗了,並說Event
類不是POJO。其次,我不知道如何計算窗口中的事件數量。現在我正在使用.sum("getTimestamp()")
,但我確定不是這樣。有什麼想法嗎?