2017-08-05 52 views
0

我目前正在嘗試利用每分鐘請求數據來豐富機器學習數據。數據存儲在Kafka話題中,應用程序開始加載和處理話題的全部內容 - 因此,根據我的知識,不可能使用Spark流的任何窗口操作,因爲所有數據都將同時到達。在映射過程中計算每分鐘RDD中的時間戳請求

我的方法是嘗試以下操作:

val kMeansFeatureRdd = kMeansInformationRdd.map(x => { 

    val begin = x._2 //Long - unix timestamp millis 
    val duration = x._3 //Long 
    val rpm = kMeansInformationRdd.filter(y => (x._2 - 60000 <= y._2 && x._2 >= y._2)).count() 

    (duration, rpm) 

}) 

但是這種方法我得到以下異常:

org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: 
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. 
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758. 

有沒有辦法達到我想要做什麼?

如果您需要更多信息,請給我留言,我會更新您的需求。

編輯:

廣播RDD不起作用。廣播收集的陣列不會導致可接受的性能。

什麼將被執行,但可怕的慢,因此不是一個真正的選擇:

val collected = kMeansInformationRdd.collect() 


    val kMeansFeatureRdd = kMeansInformationRdd.map(x => { 
     val begin = x._2 //Long - unix timestamp millis 
     val duration = x._3 //Long 

     val rpm = collected.filter(y => (x._2 - 60000 <= y._2 && x._2 >= y._2)).size 

     (duration, rpm) 

    }) 

UPDATE:

這個代碼是至少能夠把工作方式更快的完成 - 但據我發現它仍然變慢,每分鐘請求數越高,隨着過濾數組的增長 - 足夠有趣,它會變得越來越慢,我不知道爲什麼。如果有人看到問題或性能問題可以普遍改善 - 如果你讓我知道,我會很高興。

kMeansInformationRdd = kMeansInformationRdd.cache() 

    kMeansInformationRdd.sortBy(_._2, true) 

    var kMeansFeatureArray: Array[(String, Long, Long)] = Array() 
    var buffer: collection.mutable.Map[String, Array[Long]] = collection.mutable.Map() 
    var counter = 0 


    kMeansInformationRdd.collect.foreach(x => { 
     val ts = x._2 
     val identifier = x._1 //make sure the identifier represents actually the entity that receives the traffic -e.g. machine (IP?) not only endpoint 

     var bufferInstance = buffer.get(identifier).getOrElse(Array[Long]()) 

     bufferInstance = bufferInstance ++ Array(ts) 

     bufferInstance = bufferInstance.filter(p => p > ts-1000)   

     buffer.put(identifier, bufferInstance) 

     val rpm = bufferInstance.size.toLong 

     kMeansFeatureArray = kMeansFeatureArray ++ Array((identifier, x._3, rpm)) //identifier, duration, rpm 
     counter = counter +1 
     if(counter % 10000==0){ 
     println(counter) 
     println((identifier, x._3, rpm)) 
     println((instanceSizeBefore, instanceSizeAfter)) 
     } 

    }) 

    val kMeansFeatureRdd = sc.parallelize(kMeansFeatureArray) 
+0

究竟是什麼意圖?對於每個元素,找到距離開始一分鐘內的所有元素並對它們進行計數?時間範圍是否基於每個事件的具體開始很重要?或者我們可以創建連續事件的窗口並在那裏計數? – maasg

回答

0

在編輯部分給出的代碼是不正確的。這不是在Spark中廣播變量的正確方式。正確的方法是如下:

val collected = sc.broadcast(kMeansInformationRdd.collect()) 


    val kMeansFeatureRdd = kMeansInformationRdd.map(x => { 
     val begin = x._2 //Long - unix timestamp millis 
     val duration = x._3 //Long 

     val rpm = collected.value.filter(y => (x._2 - 60000 <= y._2 && x._2 >= y._2)).size 

     (duration, rpm) 

    }) 

當然,你也可以使用全局變量,以及替代sc.broadcast,但不推薦。爲什麼?

其原因是,使用外部變量直接在......之間的差(如我的所謂的「全局變量」),以及使用廣播sc.broadcast一個變量()爲:

  1. 當使用外部直接變量,spark會將串行化變量的副本與每個TASK一起發送。而按sc.broadcast,變量每個EXECUTOR發送一個副本。 Task的數量通常比Executor大10倍。所以當變量(比如一個數組)足夠大時(大於10-20K),前一種操作可能會花費很多時間進行網絡轉換,並導致頻繁的GC,從而降低了火花速度。因此建議大變量(> 10-20K)顯式廣播。

  2. 當直接使用外部變量時,變量不會被保留,它以任務結束,因此不能被重用。而sc.broadcast()這個變量是在執行者的內存中自動保存的,它會一直持續到你明確地取消它。因此sc.broadcast變量可以跨任務和階段使用。

所以如果變量預計會被多次使用,建議使用sc.broadcast()

+0

我試着廣播收集到的數組 - 但這看起來確實沒有改善性能。工作永遠運行,不會終止。而我目前只運行一個相對較小的測試集。 – LST

+0

「收集」數組中有多少個元素? – himanshuIIITian

+0

此刻150k。我只是在努力改進算法方法。我想整個數據集上的filter()並不是用這些數據去處理的。 – LST