0
我試圖將我的Spark應用程序(用Java編寫成Scala)「轉換」爲Scala。 因爲我是新來的Scala和火花的斯卡拉API,我不知道如何寫在斯卡拉這個 「transformToPair」 功能:在Scala中重寫Spark Java應用程序
的Java:
JavaPairDStream<String, Boolean> outlierPairDStream = avgAll1h.union(avgPerPlug1h).transformToPair(findOutliersPerComparisonFunction);
*** FUNCTION ***
private static Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>> findOutliersPerComparisonFunction = new Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>>() {
public JavaPairRDD<String, Boolean> call(JavaPairRDD<String, Float> v1) throws Exception {
float avgOfAll;
if(v1.count() > 0) {
avgOfAll = v1.filter(new Function<Tuple2<String,Float>, Boolean>() {
public Boolean call(Tuple2<String, Float> v1) throws Exception {
return v1._1().equals("all");
}
}).values().collect().get(0);
} else {
avgOfAll = 0.0f;
}
final float finalAvg = avgOfAll;
JavaPairRDD<String, Boolean> rddBool = v1.mapValues(new Function<Float, Boolean>() {
public Boolean call(Float v1) throws Exception {
return v1 > finalAvg;
}
});
return rddBool.filter(new Function<Tuple2<String,Boolean>, Boolean>() {
public Boolean call(Tuple2<String, Boolean> v1) throws Exception {
return !v1._1().equals("all");
}
});
}
};
這裏我嘗試使用Scala:
val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd =>
var avgOfAll = 0.0
if(rdd.count() > 0) {
avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
}
val finalAvg = avgOfAll
val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
val rddNew = rddBool.filter({case(k, v) => (k != "all")})
}
我收到以下錯誤信息:
<console>:281: error: type mismatch;
found : Unit
required: org.apache.spark.rdd.RDD[?]
}
^
能
人幫我?我怎樣才能返回「rddNew」DStream?
如果我說
return rddNew
在「改造」功能結束後,我得到以下錯誤:
<console>:293: error: return outside method definition
return rddNew
^
這爲我做,我只是試圖使用沒有RETURN關鍵字的「rddNew」行,它現在可以工作!你救了我的一天,謝謝你! –
不客氣:) – Ashalynd