2016-09-26 78 views

回答

0

Map [String,String]沒有數據集編碼器,我甚至不確定你實際上可以創建一個。

這裏有兩個版本,一個是不安全的,另一個是安全的,做你想做的事情。有效地你需要減少RDD做水平計算:

case class OnFrame(df: DataFrame) { 

    import df.sparkSession.implicits._ 

    /** 
    * If input columns don't match we'll fail at query evaluation. 
    */ 
    def unsafeRDDMap: RDD[Map[String, String]] = { 
    df.rdd.map(row => Map(row.getAs[String]("col1") -> row.getAs[String]("col2"))) 
    } 

    /** 
    * Use Dataset-to-case-class mapping. 
    * If input columns don't match we'll fail before query evaluation. 
    */ 
    def safeRDDMap: RDD[Map[String, String]] = { 
    df 
     .select($"col1" as "key", $"col2" as "value") 
     .as[OnFrame.Entry] 
     .rdd 
     .map(_.toMap) 
    } 

    def unsafeMap(): Map[String, String] = { 
    unsafeRDDMap.reduce(_ ++ _) 
    } 

    def safeMap(): Map[String, String] = { 
    safeRDDMap.reduce(_ ++ _) 
    } 

} 

如果您提供更清晰你的目標是什麼,也許我們會這樣更有效地收集一切成一個單一的地圖是一種潛在的抗星火 - 模式 - 意味着您的數據適合驅動程序。

+0

我正在使用spark 1.6,數據集概念> 2.0,我仍然可以使用它嗎? – Newbie

+0

只有不安全的版本。您的導入必須更改爲'df.sqlContext.implicits._'。 –

相關問題