2015-12-30 61 views
0

我試圖將我的Spark應用程序(用Java編寫成Scala)「轉換」爲Scala。 因爲我是新來的Scala和火花的斯卡拉API,我不知道如何寫在斯卡拉這個 「transformToPair」 功能:在Scala中重寫Spark Java應用程序

的Java:

JavaPairDStream<String, Boolean> outlierPairDStream = avgAll1h.union(avgPerPlug1h).transformToPair(findOutliersPerComparisonFunction); 

*** FUNCTION *** 

private static Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>> findOutliersPerComparisonFunction = new Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>>() { 
    public JavaPairRDD<String, Boolean> call(JavaPairRDD<String, Float> v1) throws Exception { 

     float avgOfAll; 
     if(v1.count() > 0) { 
      avgOfAll = v1.filter(new Function<Tuple2<String,Float>, Boolean>() { 
       public Boolean call(Tuple2<String, Float> v1) throws Exception { 
        return v1._1().equals("all"); 
       } 
      }).values().collect().get(0); 
     } else { 
      avgOfAll = 0.0f; 
     } 

     final float finalAvg = avgOfAll; 

     JavaPairRDD<String, Boolean> rddBool = v1.mapValues(new Function<Float, Boolean>() { 
      public Boolean call(Float v1) throws Exception { 
       return v1 > finalAvg; 
      } 
     }); 


     return rddBool.filter(new Function<Tuple2<String,Boolean>, Boolean>() { 
      public Boolean call(Tuple2<String, Boolean> v1) throws Exception { 
       return !v1._1().equals("all"); 
      } 
     }); 
    } 
}; 

這裏我嘗試使用Scala:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
    var avgOfAll = 0.0 

    if(rdd.count() > 0) { 
    avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0) 
    } 

    val finalAvg = avgOfAll 

    val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)}) 

    val rddNew = rddBool.filter({case(k, v) => (k != "all")}) 
} 

我收到以下錯誤信息:

<console>:281: error: type mismatch; 
found : Unit 
required: org.apache.spark.rdd.RDD[?] 
     } 
    ^

人幫我?我怎樣才能返回「rddNew」DStream?

如果我說

return rddNew 

在「改造」功能結束後,我得到以下錯誤:

<console>:293: error: return outside method definition 
     return rddNew 
    ^

回答

1

你有實際返回的最後一個值,例如這樣的:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
    var avgOfAll = 0.0 

    if(rdd.count() > 0) { 
    avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0) 
    } 

    val finalAvg = avgOfAll 

    val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)}) 

    val rddNew = rddBool.filter({case(k, v) => (k != "all")}) 

    rddNew 
} 

或者乾脆完全跳過定義變量:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
    var avgOfAll = 0.0 

    if(rdd.count() > 0) { 
    avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0) 
    } 

    val finalAvg = avgOfAll 

    val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)}) 

    rddBool.filter({case(k, v) => (k != "all")}) 
} 

多一點斯卡拉般的可能是:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 

    val finalAvg = if(rdd.count() > 0) { 
    rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0) 
    } else { 0.0 } 

    val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)}) 

    rddBool.filter({case(k, v) => (k != "all")}) 
} 
+0

這爲我做,我只是試圖使用沒有RETURN關鍵字的「rddNew」行,它現在可以工作!你救了我的一天,謝謝你! –

+0

不客氣:) – Ashalynd

相關問題