我目前正在嘗試利用每分鐘請求數據來豐富機器學習數據。數據存儲在Kafka話題中,應用程序開始加載和處理話題的全部內容 - 因此,根據我的知識,不可能使用Spark流的任何窗口操作,因爲所有數據都將同時到達。在映射過程中計算每分鐘RDD中的時間戳請求
我的方法是嘗試以下操作:
val kMeansFeatureRdd = kMeansInformationRdd.map(x => {
val begin = x._2 //Long - unix timestamp millis
val duration = x._3 //Long
val rpm = kMeansInformationRdd.filter(y => (x._2 - 60000 <= y._2 && x._2 >= y._2)).count()
(duration, rpm)
})
但是這種方法我得到以下異常:
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
有沒有辦法達到我想要做什麼?
如果您需要更多信息,請給我留言,我會更新您的需求。
編輯:
廣播RDD不起作用。廣播收集的陣列不會導致可接受的性能。
什麼將被執行,但可怕的慢,因此不是一個真正的選擇:
val collected = kMeansInformationRdd.collect()
val kMeansFeatureRdd = kMeansInformationRdd.map(x => {
val begin = x._2 //Long - unix timestamp millis
val duration = x._3 //Long
val rpm = collected.filter(y => (x._2 - 60000 <= y._2 && x._2 >= y._2)).size
(duration, rpm)
})
UPDATE:
這個代碼是至少能夠把工作方式更快的完成 - 但據我發現它仍然變慢,每分鐘請求數越高,隨着過濾數組的增長 - 足夠有趣,它會變得越來越慢,我不知道爲什麼。如果有人看到問題或性能問題可以普遍改善 - 如果你讓我知道,我會很高興。
kMeansInformationRdd = kMeansInformationRdd.cache()
kMeansInformationRdd.sortBy(_._2, true)
var kMeansFeatureArray: Array[(String, Long, Long)] = Array()
var buffer: collection.mutable.Map[String, Array[Long]] = collection.mutable.Map()
var counter = 0
kMeansInformationRdd.collect.foreach(x => {
val ts = x._2
val identifier = x._1 //make sure the identifier represents actually the entity that receives the traffic -e.g. machine (IP?) not only endpoint
var bufferInstance = buffer.get(identifier).getOrElse(Array[Long]())
bufferInstance = bufferInstance ++ Array(ts)
bufferInstance = bufferInstance.filter(p => p > ts-1000)
buffer.put(identifier, bufferInstance)
val rpm = bufferInstance.size.toLong
kMeansFeatureArray = kMeansFeatureArray ++ Array((identifier, x._3, rpm)) //identifier, duration, rpm
counter = counter +1
if(counter % 10000==0){
println(counter)
println((identifier, x._3, rpm))
println((instanceSizeBefore, instanceSizeAfter))
}
})
val kMeansFeatureRdd = sc.parallelize(kMeansFeatureArray)
究竟是什麼意圖?對於每個元素,找到距離開始一分鐘內的所有元素並對它們進行計數?時間範圍是否基於每個事件的具體開始很重要?或者我們可以創建連續事件的窗口並在那裏計數? – maasg