2017-05-31 27 views
1

我有一個spark java程序,其中帶有mapValues步驟的groupByKey已完成,並返回一個PairRDD,其值爲所有輸入rdd值的Iterable。 我已經讀過,用mapValues代替groupByKey處的reduceByKey會帶來性能上的提升,但我不知道如何在這裏應用reduceByKey來解決我的問題。如何使用reduceByKey替換groupByKey以作爲Spark java中的Iterable值返回?

具體而言,我有一個輸入對RDD,其類型爲Tuple5。在groupByKey和mapValues轉換之後,我需要獲得一個Key-Value對RDD,其中的值需要是輸入值的可迭代。

JavaPairRDD<Long,Tuple5<...>> inputRDD; 
... 
... 
... 
JavaPairRDD<Long, Iterable<Tuple5<...>>> groupedRDD = inputRDD 
    .groupByKey() 
    .mapValues(
      new Function<Iterable<Tuple5<...>>,Iterable<Tuple5<...>>>() { 

       @Override 
       public Iterable<Tuple5<...>> call(
         Iterable<Tuple5<...>> v1) 
         throws Exception { 

        /* 
        Some steps here..        
        */ 

        return mappedValue; 
       } 
      }); 

有沒有辦法讓我能得到使用reduceByKey上述轉型?

+0

這裏有些步驟是什麼?你需要一個邏輯來減少它。 – philantrovert

+0

在'mapValues'函數中,我實際上是根據'Tuple5'中的一個鍵對每個值進行排序。我認爲這裏沒有關係,這就是爲什麼我沒有包含它們。 – Vishnu

+0

_我已經讀過,用mapValues代替groupByKey中的reduceByKey會帶來性能增益_ - 你看錯了。 – zero323

回答

1

我一直在Spark上使用Scala,所以這不會是你可能更喜歡的確切答案。在groupByKey/mapValuesreduceByKey之間編碼的主要區別可使用適於從該article一個簡單的例子中可以看出:

val words = Array("one", "two", "two", "three", "three", "three") 
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) 

val wordCountsWithGroup = wordPairsRDD. 
    groupByKey. 
    mapValues(_.sum) 
wordCountsWithGroup.collect 
res1: Array[(String, Int)] = Array((two,2), (one,1), (three,3)) 

val wordCountsWithReduce = wordPairsRDD. 
    reduceByKey(_ + _) 
wordCountsWithReduce.collect 
res2: Array[(String, Int)] = Array((two,2), (one,1), (three,3)) 

在這個例子中,其中x => x.sum(即_.sum)在mapValues被使用,這將是(acc, x) => acc + x (即_ + _)在reduceByKey。功能簽名有很大的不同。在mapValues中,您正在處理分組值的集合,而在reduceByKey中,您正在執行縮減。

+0

據我所知,爲了得到一個RDD對的分組列表,我總是需要使用'groupKey',因爲'reduceByKey'是用於像sum這樣的聚合操作。所以在我的情況下,'reduceByKey'不是正確的嗎? – Vishnu

+0

在重新閱讀您的問題評論部分的描述之後,我會說'groupByKey'可能是要走的路,因爲我不認爲還原是一項正確的工具。 –

相關問題