2017-08-07 87 views
0

如何分析uv,pv,ip每隔5分鐘一天,並存儲Mysql。數據是從卡夫卡的格式如下:如何使用spark分析pv,uv,ip每隔5分鐘

Message sent: {"cookie":"a95f22eabc4fd4b580c011a3161a9d9d","ip":"125.119.144.252","event_time":"2017-08-07 10:50:16"} 
Message sent: {"cookie":"6b67c8c700427dee7552f81f3228c927","ip":"202.109.201.181","event_time":"2017-08-07 10:50:26"} 

這就像00:00-00:05 00:05--00:10等等, 我用:

val write=new JDBCSink() 
     val query=counts.writeStream.foreach(write).outputMode("complete") 
      .trigger(ProcessingTime("5 minutes"))  
      .start() 

但當我在00:01提交或者崩潰時,我怎麼能確定它不會像00:01-00:06那樣進行分析。

回答

0

使用window功能:

query = counts.groupBy(window('event_time', '5 second')).agg() 
query.writeStream.start() 
+0

PV,UV計算是最後一天了,窗口是無狀態的,如果我使用的窗前,彷彿這個窗口($ 「UNIX_TIMESTAMP」, 「1天」,「5分鐘「)它也應該在00:00而不是00:01運行程序 – Aaron