2016-04-25 45 views
0

我需要從HDFS很多的gzip的閱讀,是這樣的:(「* GZ」) sc.textFile 而其中一些的gzip的損壞,提高如何跳過損壞的gzips與pyspark?

產生java.io.IOException: gzip流CRC失敗

停止整個處理運行。

我讀了辯論here,其中有人有相同的需求,但沒有得到明確的解決方案。由於在火花內實現此功能是不合適的(根據鏈接),有什麼方法可以粗暴地跳過損壞的文件嗎?似乎有scala用戶的提示,不知道如何在python中處理它。

或者我只能檢測到損壞的文件,並刪除它們?

如果我有大量的gzip,並且在跑完一天之後發現最後一個已損壞,該怎麼辦。整天浪費了。並且損壞的gzip很常見。

回答

0

您可以手動列出所有文件,然後通過地圖UDF中的文件進行讀取。然後,UDF可以嘗試/除了塊來處理損壞的文件。

的代碼會看起來像

import gzip 
from pyspark.sql import Row 

def readGzips(fileLoc): 
    try: 
     ... 
     code to read file 
     ... 
     return record 
    except: 
     return Row(failed=fileLoc) 

from os import listdir 
from os.path import isfile, join 
fileList = [f for f in listdir(mypath) if isfile(join(mypath, f))] 

pFileList = sc.parallelize(fileList) 
dataRdd = pFileList.map(readGzips).filter((lambda x: 'failed' not in x.asDict())) 
+0

所以「代碼讀取文件」的部分是使用Python的gzip的模塊,以解壓縮文件?輸入是路徑字符串的列表,我的文件在hdfs上,我如何使用原始python來讀取它們? – zhangcx93

+0

你問如何從hdfs讀取文件?或者如何讀取gzip? http://stackoverflow.com/questions/10566558/python-read-lines-from-compressed-text-files描述如何使用gzip。而hdfs應該可以通過「hdfs://path/to/file.gz」獲得, – David

+0

我的意思是在readGzips()中,只能使用python從hdfs訪問文件,外部包,因爲open('hdfs://file')會引發IOError。另外,以這種方式,mesos和spark不能正確指定工作以使機器具有文件來處理數據,因此比使用sc.textFile()正常讀取文件的方式慢。 – zhangcx93