予取數據的流(nginx的聯機日誌),數據結構是:火花流爲時間序列處理從UDP套接字(由時間間隔劃分的數據)
date | ip | mac | objectName | rate | size
2016-04-05 11:17:34 | 10.0.0.1 | e1:e2 | book1 | 10 | 121
2016-04-05 11:17:34 | 10.0.0.2 | a5:a8 | book2351 | 8 | 2342
2016-04-05 11:17:34 | 10.0.0.3 | d1:b56| bookA5 | 10 | 12
2016-04-05 11:17:35 | 10.0.0.1 | e1:e2 | book67 | 10 | 768
2016-04-05 11:17:35 | 10.0.0.2 | a5:a8 | book2351 | 8 | 897
2016-04-05 11:17:35 | 10.0.0.3 | d1:b56| bookA5 | 9 | 34
2016-04-05 11:17:35 | 10.0.0.4 | c7:c2 | book99 | 9 | 924
...
2016-04-05 11:18:01 | 10.0.0.1 | e1:e2 | book-10 | 8 | 547547
2016-04-05 11:18:17 | 10.0.0.4 | c7:c2 | book99 | 10 | 23423
2016-04-05 11:18:18 | 10.0.0.3 | d1:b56| bookA5 | 10 | 1138
我不得不:
- 聚合數據,按分鐘分區 - 一分鐘結果行(分鐘,ip,mac)
- objectName - 可以在分鐘內更改,我必須先取第一個,即爲
2016-04-05 11:17:34 | 10.0.0.1 | e1:e2
book1
更改爲book67
,因此必須是book1
- 速率 - 期間munute
- 大小的變化率計數 - 大小之間差別爲
2016-04-05 11:17:34 | 10.0.0.1 | e1:e2
(內側分區前面的時間,當前的內部分區時間),即= ... 768 - 121
所以,結果(不計算尺寸):
date | ip | mac | objectName | changes | size
2016-04-05 11:17:00 | 10.0.0.1 | e1:e2 | book1 | 0 | 768 - 121
2016-04-05 11:17:00 | 10.0.0.2 | a5:a8 | book2351 | 0 | 897 - 2342
2016-04-05 11:17:00 | 10.0.0.3 | d1:b56| bookA5 | 1 | 34 - 12
2016-04-05 11:17:00 | 10.0.0.4 | c7:c2 | book99 | 0 | 924
...
2016-04-05 11:18:00 | 10.0.0.1 | e1:e2 | book-10 | 0 | 547547
2016-04-05 11:18:00 | 10.0.0.4 | c7:c2 | book99 | 0 | 23423
2016-04-05 11:18:00 | 10.0.0.3 | d1:b56| bookA5 | 0 | 1138
這裏扣我的代碼,我知道updateStateByKey
約window
,但我不明白particul阿爾利,我怎麼能刷新數據,數據庫或文件系統,當週期(分鐘)改爲:
private static final Duration SLIDE_INTERVAL = Durations.seconds(10);
private static final String nginxLogHost = "localhost";
private static final int nginxLogPort = 9999;
private class Raw {
LocalDate time; // full time with seconds
String ip;
String mac;
String objectName;
int rate;
int size;
}
private class Key {
LocalDate time; // time with 00 seconds
String ip;
String mac;
}
private class RawValue {
LocalDate time; // full time with seconds
String objectName;
int rate;
int size;
}
private class Value {
String objectName;
int changes;
int size;
}
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("TestNginxLog");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaStreamingContext jssc = new JavaStreamingContext(conf, SLIDE_INTERVAL);
jssc.checkpoint("/tmp");
JavaReceiverInputDStream<Raw> logRecords = jssc.receiverStream(new NginxUDPReceiver(nginxLogHost, nginxLogPort));
PairFunction<Raw, Key, RawValue> pairFunction = (PairFunction<Raw, Key, RawValue>) rawLine -> {
LocalDateTime time = rawLine.getDateTime();
Key k = new Key(LocalTime.of(time.getHour(), time.getMinute()), rawLine.getIp(), rawLine.getMac());
RawValue v = new RawValue(time, rawLine.getObjectName(), rawLine.getRate(), rawLine.getSize());
return new Tuple2<>(k, v);
};
JavaPairDStream<Key, RawValue> logDStream = logRecords.mapToPair(pairFunction);