2017-10-12 41 views
0

我正在研究寫入S3後會自動將表和分區註冊到配置單元元存儲的內容。從Spark數據集獲取配置單元分區

在我可以註冊所有分區之前,我需要知道所有的分區值。現在我正在做ds.select(partitionColumn).distinct().collectAsList();來獲取所有的分區值。

有沒有更好的方法從我的數據集中獲取分區值?

+0

AWS膠水已經爲你做這個。 –

+0

我不知道更好的解決方案,這也是我的做法 –

+0

@ThiagoBaldim我們看過AWS Glue,但它似乎並不允許我們將其用作外部產品的Metastore服務。像Tableau,Databricks等... –

回答

0

讀取Spark源代碼後,特別是AlterTableRecoverPartitionsCommand,org.apache.spark.sql.execution.command.ddl.scala,這是Spark實現的ALTER TABLE RECOVER PARTITIONS。它掃描所有分區,然後註冊它們。

因此,這裏是相同的想法,掃描我們剛寫入的位置的所有分區。

從中獲取密鑰名稱,然後從中提取分區名稱/值。

以下是獲取路徑的代碼片段。

String location = "s3n://somebucket/somefolder/dateid=20171010/"; 
Path root = new Path(location); 

Configuration hadoopConf = sparkSession.sessionState().newHadoopConf(); 
FileSystem fs = root.getFileSystem(hadoopConf); 

JobConf jobConf = new JobConf(hadoopConf, this.getClass()); 
final PathFilter pathFilter = FileInputFormat.getInputPathFilter(jobConf); 

FileStatus[] fileStatuses = fs.listStatus(root, path -> { 
    String name = path.getName(); 
    if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) { 
     return pathFilter == null || pathFilter.accept(path); 
    } else { 
     return false; 
    } 
}); 

for(FileStatus fileStatus: fileStatuses) { 
    System.out.println(fileStatus.getPath().getName()); 
} 
+0

基於這種方法,我們可以擴展當前的過濾器來完成額外的工作。 折衷是我們得到的路徑不正是我們寫到,如果SaveMode不覆蓋。在我的情況下,我現在只將它用於覆蓋模式。 –

相關問題