2017-10-18 92 views
0

我讀了reducebyKey對大數據集來說是一個更好的選擇,可以減少數據的混洗,並以這種方式提高性能。我想轉換我的使用groupByKey。首先,它必須被轉換爲RDD:將groupByKey轉換爲reduceByKey

val linksNew = links.map(convertToRelationship) 
    .flatMap(bidirRelationship) 

鏈接是一個數據集和數據集的API沒有reduceByKey。當使用reduceByKey時,相當於.groupByKey(_._1)

val linksfinal = linksNew.rdd.reduceByKey(???) 

實際代碼:

enter image description here

一些數據集中的實際數據:

val biLinks = links 
    .map(convertToRelationship) 
    .flatMap(bidirRelationship) 
    .groupByKey(_._1) 
    .reduceGroups((left, right) => combineBidirerRelationships(left,right)) 
    .map(_._2._2) 

數據集的架構,只是groupByKey(_._1)之前使用enter image description here

+0

你的數據在'groupByKey()'之前的樣子是怎麼樣的? – Shaido

+0

@Shaido喜歡這樣''DataSet(String,Relationship)'然後應該如下 with groupByKey跟着,'KeyValueGroupedDataSet [String,(String,Relationship)]' – dedpo

+0

添加一個我相信會工作的答案,儘管我沒有'對它進行測試,告訴我它是如何工作的。 – Shaido

回答

1

不知道它是否更有效率,但是,應該可以將其轉換爲reduceByKey,因爲您在groupByKey之後直接執行reduceGroups

val biLinks = links 
    .map(convertToRelationship) 
    .flatMap(bidirRelationship) 
    .rdd 
    .map{row => (row.getAs[String](0), row.getAs[Relationship](1))} // See explanation below 
    .reduceByKey((left, right) => combineBidirerRelationships(left, right)) 
    .map(_._2._2) 

根據數據幀看起來像使用.rdd後,可以要求額外的轉換:短例如,使用所提供的代碼的一部分。從數據幀轉換時,生成的rdd將是RDD[Row]。但是,要使reduceByKey()工作,需要類型RDD[(A,B)]的元組rdd,其中AB是類型(它們本身也可以是元組)。如何在rdd.map(...)轉化可以structs工作


短的例子:

case class Relationship(a: Long, b: Long) 
val df = spark.createDataFrame(Seq((1, Relationship(3L, 2L)), (2, Relationship(20L, 7L)))).toDF() 
val rdd = df.rdd.map{ row => (row.getAs[String](0), row.getAs[Relationship](1))} 

這使所需的元組RDD類型,這裏RDD[(String, Relationship)]

+0

這兩個添加,'col1:String,col2:Relationship' 當我打印我們試圖轉換的模式,他們是struct1和striuct2,例如struct2是關係案例類創建的數據? – dedpo

1

我看到,reducebyKey是一個更好的選擇大型數據集,以減少洗牌和或洗牌減少方面,並提高性能。

不是。你在混淆「舊」RDD API,其中groupByKey有不同的語義。

Dataset API groupByKey + reduceGroups在舊API中使用與reduceByKey類似的執行模型。事實上,轉換爲RDD時使用效率較低的洗牌機制,代價非常高昂,所以你只會讓情況變得更糟。

相關問題