2017-01-10 62 views
0

我有一些日常的數據保存到文件夾多(主要是基於時間)。現在我有兩種格式來存儲文件一個是實木複合地板,另一個是csv,我想保存爲鑲木地板格式以節省一些空間。 的文件夾結構類似於以下內容:如何讓火花2.0閱讀輯陣文件夾鑲如CSV

[[email protected] raw]# tree 
. 
├── entityid=10001 
│   └── year=2017 
│    └── quarter=1 
│     └── month=1 
│      ├── day=6 
│      │   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 
│      └── day=7 
│       └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 
├── entityid=100055 
│   └── year=2017 
│    └── quarter=1 
│     └── month=1 
│      ├── day=6 
│      │   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 
│      └── day=7 
│       └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 
├── entityid=100082 
│   └── year=2017 
│    └── quarter=1 
│     └── month=1 
│      ├── day=6 
│      │   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 
│      └── day=7 
│       └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 
└── entityid=10012 
    └── year=2017 
     └── quarter=1 
      └── month=1 
       ├── day=6 
       │   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 
       └── day=7 
        └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 

現在我有一個Python列表存儲所有文件夾需要讀,假設每次運行只需要閱讀一些關於過濾條件的文件夾基地。

folderList=df_inc.collect() 
folderString=[] 

for x in folderList: 
    folderString.append(x.folders) 
In [44]: folderString 
Out[44]: 
[u'/data/raw/entityid=100055/year=2017/quarter=1/month=1/day=7', 
u'/data/raw/entityid=10012/year=2017/quarter=1/month=1/day=6', 
u'/data/raw/entityid=100082/year=2017/quarter=1/month=1/day=7', 
u'/data/raw/entityid=100055/year=2017/quarter=1/month=1/day=6', 
u'/data/raw/entityid=100082/year=2017/quarter=1/month=1/day=6', 
u'/data/raw/entityid=10012/year=2017/quarter=1/month=1/day=7'] 

該文件由writen:

df_join_with_time.coalesce(1).write.partitionBy("entityid","year","quarter","month","day").mode("append").parquet(rawFolderPrefix) 

當我嘗試閱讀df_batch=spark.read.parquet(folderString)錯誤java.lang.ClassCastException存儲在folderString的文件夾:java.util.ArrayList中不能轉換爲Java。 lang.String遇到。

,如果我將文件保存爲CSV格式閱讀下面通過它就可以工作細如下面的代碼:請無論如何,如果讀木地板文件夾中的文件列表,非常感激!

In [46]: folderList=df_inc.collect() 
    ...: folderString=[] 
    ...: 
    ...: for x in folderList: 
    ...:  folderString.append(x.folders) 
    ...: df_batch=spark.read.csv(folderString) 
    ...: 

In [47]: df_batch.show() 
+------------+---+-------------------+----------+----------+ 
|   _c0|_c1|    _c2|  _c3|  _c4| 
+------------+---+-------------------+----------+----------+ 
|6C25B9C3DD54| 1|2017-01-07 00:00:01|1483718401|1483718400| 
|38BC1ADB0164| 3|2017-01-06 00:00:01|1483632001|1483632000| 
|38BC1ADB0164| 3|2017-01-07 00:00:01|1483718401|1483718400| 

回答

0

我得到這個要解決:

df=spark.read.parquet(folderString[0]) 
y=0 
for x in folderString: 
    if y>0: 
     df=df.union(spark.read.parquet(x)) 
    y=y+1 

這是一個非常醜陋的解決方案,如果您有好的想法,請讓我知道。非常感謝。

幾天後,發現解決問題的最佳方式:

df=spark.read.parquet(*folderString) 
+0

但是,如果你指向原始文件夾。像'spark.read。實木複合地板('/ data/raw')'它會正常工作。 –

+0

感謝您的評論,我不想讀取/ data/raw下的所有文件夾,只有其中的一些存儲在列表變量folderString –

1

你正面臨Hadoop和平面分區的小姐認識。

你看,我有按年按月分區一個簡單的文件結構。它是這樣的:

my_folder 
. 
├── year-month=2016-12 
| └── my_files.parquet 
├── year-month=2016-11 
| └── my_files.parquet 

如果我做從my_folder讀取,而不在我的數據幀的讀者任何過濾器是這樣的:

df = saprk.read.parquet("path/to/my_folder") 
df.show() 

如果檢查星火DAG可視化,你可以看到,在這種情況下,它像你說的會讀我的所有分區:

enter image description here

在上述情況下,在第一方陣的每個點爲鄰ne數據分區。

但是,如果我我的代碼改成這樣:

df = saprk.read.parquet("path/to/my_folder")\ 
      .filter((col('year-month') >= lit(my_date.strftime('%Y-%m'))) & 
        (col('year-month') <= lit(my_date.strftime('%Y-%m')))) 

的DAG可視化將顯示我多少分區使用:

enter image description here

所以,如果你被列篩選那就是你不會讀取所有文件的分區。只要您需要,您就不需要使用按文件夾讀取一個文件夾的解決方案。

+0

嗨Thiago,謝謝你的迴應,我的問題是,文件夾我需要閱讀是隨機的。它完全基於當前的增量文件,並且該文件可能包含任何一天的數據,我可以按照當天的文件分組來找出它們。這就是df_inc。一旦我知道哪一天(當然包括今天)需要重新計算,我會同時閱讀歷史文件和今天的增量更改。所以我希望我能讀取所有包含df_inc的文件,並且無法提供精確的過濾器。如果我用csv方法讀取,那麼完全相同的方法就可以完美工作。 –