2016-06-14 18 views
3

我想根據特性將一個巨大的csv文件細分爲不同的分區來優化Spark應用程序的運行時間。Apache Spark:使用文件夾結構來減少分析的運行時間

E.g.我有一個包含客戶ID(整數,a)的列,包含日期(月+年,例如01.2015,b)的列和包含產品ID(整數,c)的列(以及包含產品特定數據的更多列,不需要爲分區)。

我想建立一個像/customer/a/date/b/product/c這樣的文件夾結構。當用戶想要了解2016年1月售出的來自客戶X的產品信息時,他可以加載和分析保存在/customer/X/date/01.2016/*中的文件。

是否有可能通過通配符加載這樣的文件夾結構?應該也可以加載特定時間範圍內的所有客戶或產品,例如, 01.2015至09.2015。是否可以使用通配符如/customer/*/date/*.2015/product/c?或者如何解決這樣的問題呢?

我想對數據進行一次分區,稍後在分析中加​​載特定文件,以減少這些作業的運行時間(忽略分區的額外工作)。

解決方案:木地板的工作文件

我改變了我的星火應用程序保存我的數據有木文件,現在一切工作正常,並通過給文件夾結構,我可以預先選擇數據。在這裏我的代碼片段:

JavaRDD<Article> goodRdd = ... 

SQLContext sqlContext = new SQLContext(sc); 

List<StructField> fields = new ArrayList<StructField>(); 
fields.add(DataTypes.createStructField("keyStore", DataTypes.IntegerType, false)); 
fields.add(DataTypes.createStructField("textArticle", DataTypes.StringType, false)); 

StructType schema = DataTypes.createStructType(fields); 

JavaRDD<Row> rowRDD = goodRdd.map(new Function<Article, Row>() { 
    public Row call(Article article) throws Exception { 
     return RowFactory.create(article.getKeyStore(), article.getTextArticle()); 
    } 
}); 

DataFrame storeDataFrame = sqlContext.createDataFrame(rowRDD, schema); 

// WRITE PARQUET FILES 
storeDataFrame.write().partitionBy(fields.get(0).name()).parquet("hdfs://hdfs-master:8020/user/test/parquet/"); 

// READ PARQUET FILES 
DataFrame read = sqlContext.read().option("basePath", "hdfs://hdfs-master:8020/user/test/parquet/").parquet("hdfs://hdfs-master:8020/user/test/parquet/keyStore=1/"); 

System.out.println("READ : " + read.count()); 

重要

不要用表試試只用一列!當您嘗試撥打partitionBy方法時,您會得到例外!

+0

不能創建爲HDFS路徑蜂巢表?配置表格支持動態分區和靜態分區。使用數據框,您可以根據需要查詢數據。 –

+1

@RamPrasadG你不需要創建配置單元表。 Spark可以做到這一點。無論如何,也許我會回答這個問題,而不是;) –

+0

@GlennieHellesSindholt:這意味着,Spark可以解釋像「/客戶/ * /日期/ * /產品/ 123」路徑? –

回答

9

因此,在Spark中,您可以按照您要查找的方式保存和讀取分區數據。然而,而不是創建路徑像你這樣當你使用保存數據/customer/a/date/b/product/c星火將使用這種約定/customer=a/date=b/product=c

df.write.partitionBy("customer", "date", "product").parquet("/my/base/path/") 

當你需要在讀取數據時,需要指定basepath-option這樣的:

sqlContext.read.option("basePath", "/my/base/path/").parquet("/my/base/path/customer=*/date=*.2015/product=*/") 

以下的所有內容將被解釋爲Spark的列。在此處給出的示例中,Spark會將三列customer,dateproduct添加到數據框中。請注意,您可以根據需要爲任何列使用通配符。

對於讀取特定時間範圍內的數據,您應該知道Spark使用謂詞下推,因此它只會實際將數據加載到符合條件的內存中(如某些過濾器轉換所指定的那樣)。但是如果你真的想明確指定範圍,你可以生成一個路徑名列表,然後將其傳遞給讀取函數。就像這樣:

val pathsInMyRange = List("/my/path/customer=*/date=01.2015/product=*", 
          "/my/path/customer=*/date=02.2015/product=*", 
          "/my/path/customer=*/date=03.2015/product=*"..., 
          "/my/path/customer=*/date=09.2015/product=*") 

sqlContext.read.option("basePath", "/my/base/path/").parquet(pathsInMyRange:_*) 

無論如何,我希望這有助於:)

+0

謝謝!看起來不錯,只是試了一下 - 它沒有劃分的工作...當我使用「df.write.partitionBy」我得到一個例外,請參閱上面編輯的代碼。 –

+0

它現在工作!謝謝你的回答,@ glennie-helles-sindholt!由於我試圖用一列(不切實際的測試用例)對一個表進行分區,所以發生異常,所以在這裏你至少需要兩列來使它工作! –