2017-01-23 47 views
1

我是新來的火花和嘗試學習。這是一個相當簡單的問題,我有下面的代碼來減少重複鍵w.r.t到他們的值。如何在Apache Spark中執行簡單的reduceByKey?

數據幀都會有這樣的價值觀。

subject  object  

    node1  node5 
    node1  node6 
    node1  node7 
    node2  node5 
    node2  node7 

而且我希望他們能像這樣減少。

subject  object  

    node1  [node5,node6,node7] 
    node2  [node5,node7] 

我能實現這個使用groupByKey方法,但我想在這裏使用reduceByKey對此我無法理解什麼是執行這一正確的語法。

這裏是我的代碼:

DataFrame records = Service.sqlCtx().sql("SELECT subject,object FROM Graph"); 


    JavaPairRDD<String,Iterable<String>> rows = records.select("subject","object").toJavaRDD().mapToPair(
      new PairFunction<Row,String,String>(){ 

       @Override 
       public Tuple2<String, String> call(Row row) throws Exception { 
        return new Tuple2<String, String>(row.getString(0), row.getString(1)); 
       } 

      // this can be optimized if we use reduceByKey instead of groupByKey 
    }).distinct().groupByKey().cache(); 

回答

0
  • 在一般情況下,這不能與reduceByKey優化。效率低下的部分是操作不是特定的實現。
  • 此外,這不能直接與reduceByKey實現由於不相容簽名。這可以通過aggregateByKeycombineByKey完成,但它仍然不是優化。
  • 最後,如果你使用DataFrames只需使用collect_list

    import static org.apache.spark.sql.functions.*; 
    
    records.groupBy("subject").agg(collect_list(col("object"))); 
    
-1

有我們可以應用reduceByKey優化的方式,但我們必須改造1前reduceByKey。

val keyValuePairs = sc.parallelize(List(("node1","node5"),("node1","node6"),("node1","node7"),("node2","node5"),("node2","node7"))) //Input 

val mappedKV = keyValuePairs.map(x => (x._1,Seq(x._2))) 

//Transform each value of the K,V pair to 'Seq' (extra transformation) 

val reducedKV = mappedKV.reduceByKey(_++_) 

然後施加 '++' 與reduceByKey。

輸出:

階> reducedKV.collect

數組[(字符串,SEQ [字符串])] =陣列((節點2,列表(節點5,node7)),(節點1,列表(節點5,node6,node7)))

相關問題