我正在研究寫入S3後會自動將表和分區註冊到配置單元元存儲的內容。從Spark數據集獲取配置單元分區
在我可以註冊所有分區之前,我需要知道所有的分區值。現在我正在做ds.select(partitionColumn).distinct().collectAsList();
來獲取所有的分區值。
有沒有更好的方法從我的數據集中獲取分區值?
我正在研究寫入S3後會自動將表和分區註冊到配置單元元存儲的內容。從Spark數據集獲取配置單元分區
在我可以註冊所有分區之前,我需要知道所有的分區值。現在我正在做ds.select(partitionColumn).distinct().collectAsList();
來獲取所有的分區值。
有沒有更好的方法從我的數據集中獲取分區值?
讀取Spark源代碼後,特別是AlterTableRecoverPartitionsCommand
,org.apache.spark.sql.execution.command.ddl.scala
,這是Spark實現的ALTER TABLE RECOVER PARTITIONS
。它掃描所有分區,然後註冊它們。
因此,這裏是相同的想法,掃描我們剛寫入的位置的所有分區。
從中獲取密鑰名稱,然後從中提取分區名稱/值。
以下是獲取路徑的代碼片段。
String location = "s3n://somebucket/somefolder/dateid=20171010/";
Path root = new Path(location);
Configuration hadoopConf = sparkSession.sessionState().newHadoopConf();
FileSystem fs = root.getFileSystem(hadoopConf);
JobConf jobConf = new JobConf(hadoopConf, this.getClass());
final PathFilter pathFilter = FileInputFormat.getInputPathFilter(jobConf);
FileStatus[] fileStatuses = fs.listStatus(root, path -> {
String name = path.getName();
if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
return pathFilter == null || pathFilter.accept(path);
} else {
return false;
}
});
for(FileStatus fileStatus: fileStatuses) {
System.out.println(fileStatus.getPath().getName());
}
基於這種方法,我們可以擴展當前的過濾器來完成額外的工作。 折衷是我們得到的路徑不正是我們寫到,如果SaveMode不覆蓋。在我的情況下,我現在只將它用於覆蓋模式。 –
AWS膠水已經爲你做這個。 –
我不知道更好的解決方案,這也是我的做法 –
@ThiagoBaldim我們看過AWS Glue,但它似乎並不允許我們將其用作外部產品的Metastore服務。像Tableau,Databricks等... –