我想計算每個鍵的值的PairDStream的中值。Apache Spark Streaming:通過密鑰的窗口化PairDStream的中值
我已經嘗試以下,這是非常效率不高:
JavaPairDStream<String, Iterable<Float>> groupedByKey = pairDstream.groupByKey();
JavaPairDStream<String, Float> medianPerPlug1h = groupedByKey.transformToPair(new Function<JavaPairRDD<String,Iterable<Float>>, JavaPairRDD<String,Float>>() {
public JavaPairRDD<String, Float> call(JavaPairRDD<String, Iterable<Float>> v1) throws Exception {
return v1.mapValues(new Function<Iterable<Float>, Float>() {
public Float call(Iterable<Float> v1) throws Exception {
List<Float> buffer = new ArrayList<Float>();
long count = 0L;
Iterator<Float> iterator = v1.iterator();
while(iterator.hasNext()) {
buffer.add(iterator.next());
count++;
}
float[] values = new float[(int)count];
for(int i = 0; i < buffer.size(); i++) {
values[i] = buffer.get(i);
}
Arrays.sort(values);
float median;
int startIndex;
if(count % 2 == 0) {
startIndex = (int)(count/2 - 1);
float a = values[startIndex];
float b = values[startIndex + 1];
median = (a + b)/2.0f;
} else {
startIndex = (int)(count/2);
median = values[startIndex];
}
return median;
}
});
}
});
medianPerPlug1h.print();
有人可以幫助我更有效的交易?我有大約1950個不同的密鑰,每個密鑰可以達到3600(每秒1個數據點,1小時窗口)值,在哪裏可以找到中值。
謝謝!
你需要多長時間一次計算中位數?你在使用滑動窗口嗎? – vanekjar
實際上我使用的是一個1h的窗口(所以數據是受限制的,並且在這個預熱時間之後不會增長),並且每個幻燈片的持續時間和間隔爲2秒。我可以增加批次和滑動間隔,但我想盡可能快地進行計算。 對我來說,找到一個更好的轉換來獲得更多Spark的並行算法會更有趣。 –