2015-09-04 22 views
0

我想計算每個鍵的值的PairDStream的中值。Apache Spark Streaming:通過密鑰的窗口化PairDStream的中值

我已經嘗試以下,這是非常效率不高:

JavaPairDStream<String, Iterable<Float>> groupedByKey = pairDstream.groupByKey(); 

JavaPairDStream<String, Float> medianPerPlug1h = groupedByKey.transformToPair(new Function<JavaPairRDD<String,Iterable<Float>>, JavaPairRDD<String,Float>>() { 
     public JavaPairRDD<String, Float> call(JavaPairRDD<String, Iterable<Float>> v1) throws Exception { 
      return v1.mapValues(new Function<Iterable<Float>, Float>() { 
       public Float call(Iterable<Float> v1) throws Exception { 

        List<Float> buffer = new ArrayList<Float>(); 

        long count = 0L; 
        Iterator<Float> iterator = v1.iterator(); 
        while(iterator.hasNext()) { 
         buffer.add(iterator.next()); 
         count++; 
        } 

        float[] values = new float[(int)count]; 

        for(int i = 0; i < buffer.size(); i++) { 
         values[i] = buffer.get(i); 
        } 

        Arrays.sort(values); 

        float median; 

        int startIndex; 

        if(count % 2 == 0) { 
         startIndex = (int)(count/2 - 1); 

         float a = values[startIndex]; 
         float b = values[startIndex + 1]; 

         median = (a + b)/2.0f; 
        } else { 
         startIndex = (int)(count/2); 

         median = values[startIndex]; 
        } 

        return median; 
       } 
      }); 
     } 
}); 

medianPerPlug1h.print(); 

有人可以幫助我更有效的交易?我有大約1950個不同的密鑰,每個密鑰可以達到3600(每秒1個數據點,1小時窗口)值,在哪裏可以找到中值。

謝謝!

+0

你需要多長時間一次計算中位數?你在使用滑動窗口嗎? – vanekjar

+0

實際上我使用的是一個1h的窗口(所以數據是受限制的,並且在這個預熱時間之後不會增長),並且每個幻燈片的持續時間和間隔爲2秒。我可以增加批次和滑動間隔,但我想盡可能快地進行計算。 對我來說,找到一個更好的轉換來獲得更多Spark的並行算法會更有趣。 –

回答

0

首先,我不知道你爲什麼使用Spark來完成這種任務。考慮到你只有幾千個值,它似乎與大數據無關。它可能使事情更加複雜。但是讓我們假設你正在計劃擴大到更大的數據集。

我會嘗試使用一些更優化的算法來查找中值,而不僅僅是排序值。排序值的數組運行於O(n * log n)時間。

你可以考慮使用一些線性時間的中位數算法像Median of medians

+0

我剛剛學習Spark,想要嘗試並行計算數據集有趣值的能力。它應該是稍後適應更大數據集的原型。但是,對於龐大的數據集,這種邏輯似乎太昂貴了,正如你上面提到的那樣。 我會在接下來的幾天看看Medians的中位數,謝謝你的信息! –

0

1)避免使用groupbykey; reducebykey比groupbykey更高效。 2)reduceByKeyAndWindow(Function2,windowduration,slideDuration)可以更好地爲您服務。

例如: JavaPairDStream合併= yourRDD.reduceByKeyAndWindow(新功能2(){ 公共字符串呼叫(字符串爲arg0,字符串ARG1)拋出異常{ 返回爲arg0 + 「」 + ARG1; } },Durations.seconds (windowDur),Durations.seconds(slideDur));

假設此RDD的輸出如下: (key,1,2,3,4,5,6,7) (key,1,2,3,4,5,6,7) 。 現在對於每個鍵,你可以解析這個,你將會得到如下數值:012 + 1 + 2 + 3 + 4 + 5 + 6 + 7/count。

我希望它有幫助:)

相關問題