要保存分區Dataset
您可以使用:
DataFrameWriter.partitionBy
- 可用,因爲星火1.6
df.write.partitionBy("someColumn").format(...).save()
DataFrameWriter.bucketBy
- 因爲星火2.0
df.write.bucketBy("someColumn").format(...).save()
可用
使用df.partitionBy("someColumn").write.format(...).save
應該可以正常工作,但Dataset
API不使用哈希碼。它使用MurmurHash
,所以結果將不同於RDD API中HashParitioner
的結果,並且不重要的檢查(如您所描述的那樣)將不起作用。
val oldHashCode = udf((x: Long) => x.hashCode)
// https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/util/Utils.scala#L1596-L1599
val nonNegativeMode = udf((x: Int, mod: Int) => {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
})
val df = spark.range(0, 10)
val oldPart = nonNegativeMode(oldHashCode($"id"), lit(3))
val newPart = nonNegativeMode(hash($"id"), lit(3))
df.select($"*", oldPart, newPart).show
+---+---------------+--------------------+
| id|UDF(UDF(id), 3)|UDF(hash(id, 42), 3)|
+---+---------------+--------------------+
| 0| 0| 1|
| 1| 1| 2|
| 2| 2| 2|
| 3| 0| 0|
| 4| 1| 2|
| 5| 2| 2|
| 6| 0| 0|
| 7| 1| 0|
| 8| 2| 2|
| 9| 0| 2|
+---+---------------+--------------------+
一個可能的疑難雜症是DataFrame
作家可以合併多個小文件,以降低成本,讓來自不同分區的數據可以在一個單一的文件中提出。
謝謝!我只是用bucketBy測試你的例子,因爲它看起來完全符合我想要的(在同一分區中給定列表中具有相同數字的行),但在執行的某個時刻,我得到了'線程中的異常'main「org.apache .spark.sql.AnalysisException:'save'現在不支持分段;' –