0
我需要從服務器端日誌中計算應用程序的「新用戶,活動用戶」。 我已經實現了scala和spark的日常計算算法。這項工作每天提交一次,並獲得當天的所有結果。它運作良好。
這裏是我的舊日常算法實現的一些僞代碼。此代碼運行,每天一次,並得到了一組每日結果:如何使用spark-streaming實現日常計算並獲得實時結果
// Get today log from hbase or somewhere else
val log = getRddFromHbase(todayDate)
// Compute active user
val activeUser = log.map(line => ((line.uid, line.appId), line).reduceByKey(distinctStrategyMethod)
// Get history user from hdfs
val historyUser = loadFromHdfs(path + yesterdayDate)
// Compute new user from active user and historyUser
val newUser = activeUser.subtractByKey(historyUser)
// Get new history user
val newHistoryUser = historyUser.union(newUser)
// Save today history user
saveToHdfs(path + todayDate)
現在我想獲得「實時」的結果:
1的結果應重新計算,並改爲每5分鐘或減。
2.結果應該在一天的開始時爲0,並且與一天結束時的舊算法相同。
我認爲如果我使用一個恆定的時間窗口(1天,每5分鐘滑動一次,我認爲)來實現算法是不對的。
如果有人能爲我提供一些想法或示例,我將不勝感激。謝謝你的時間。
其實我可以正確讀取從卡夫卡流和過程DSTREAM。我需要一些關於流式算法,spark-streaming的api和解決這類問題的思考。也許你可以在這個問題上幫助我。這是更具體的。 [如何使用spark-streaming從流中計算新元素](http://stackoverflow.com/questions/34786117/how-to-count-new-element-from-stream-by-using-spark-streaming) – yyforever1988