2016-11-14 58 views
2

問題:我正在嘗試重新分區數據集,以便在指定的整數列中具有相同編號的所有行位於同一個分區中。使用Apache Spark進行重新分區

什麼是工作:當我使用RDD的1.6 API(使用Java)時,我使用散列分區器,並按預期工作。例如,如果我爲每行打印此列的每個值的模數,我會在給定的分區中獲得相同的模數(我通過手動讀取使用saveAsHadoopFile保存的內容來讀取分區)。

預期與最新的API

但現在我想使用2.0.1 API(斯卡拉)和具有再分配方法,它採取了一些分區的數據集和它不工作列,並將此DataSet另存爲parquet文件。如果我在分區中查看列中沒有分區的行,結果就不一樣了。

回答

5

要保存分區Dataset您可以使用:

  • DataFrameWriter.partitionBy - 可用,因爲星火1.6

    df.write.partitionBy("someColumn").format(...).save() 
    
  • DataFrameWriter.bucketBy - 因爲星火2.0

    df.write.bucketBy("someColumn").format(...).save() 
    
可用

使用df.partitionBy("someColumn").write.format(...).save應該可以正常工作,但Dataset API不使用哈希碼。它使用MurmurHash,所以結果將不同於RDD API中HashParitioner的結果,並且不重要的檢查(如您所描述的那樣)將不起作用。

val oldHashCode = udf((x: Long) => x.hashCode) 

// https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/util/Utils.scala#L1596-L1599 
val nonNegativeMode = udf((x: Int, mod: Int) => { 
    val rawMod = x % mod 
    rawMod + (if (rawMod < 0) mod else 0) 
}) 

val df = spark.range(0, 10) 

val oldPart = nonNegativeMode(oldHashCode($"id"), lit(3)) 
val newPart = nonNegativeMode(hash($"id"), lit(3)) 

df.select($"*", oldPart, newPart).show 
+---+---------------+--------------------+ 
| id|UDF(UDF(id), 3)|UDF(hash(id, 42), 3)| 
+---+---------------+--------------------+ 
| 0|    0|     1| 
| 1|    1|     2| 
| 2|    2|     2| 
| 3|    0|     0| 
| 4|    1|     2| 
| 5|    2|     2| 
| 6|    0|     0| 
| 7|    1|     0| 
| 8|    2|     2| 
| 9|    0|     2| 
+---+---------------+--------------------+ 

一個可能的疑難雜症是DataFrame作家可以合併多個小文件,以降低成本,讓來自不同分區的數據可以在一個單一的文件中提出。

+0

謝謝!我只是用bucketBy測試你的例子,因爲它看起來完全符合我想要的(在同一分區中給定列表中具有相同數字的行),但在執行的某個時刻,我得到了'線程中的異常'main「org.apache .spark.sql.AnalysisException:'save'現在不支持分段;' –