我正在使用wholeTextFiles
來讀取目錄中的每個文件。之後,我使用map
在rdd的每個元素上調用一個函數。整個程序僅使用每個文件的50行。代碼如下:apache spark:從目錄中讀取大尺寸文件
def processFiles(fileNameContentsPair):
fileName= fileNameContentsPair[0]
result = "\n\n"+fileName
resultEr = "\n\n"+fileName
input = StringIO.StringIO(fileNameContentsPair[1])
reader = csv.reader(input,strict=True)
try:
i=0
for row in reader:
if i==50:
break
// do some processing and get result string
i=i+1
except csv.Error as e:
resultEr = resultEr +"error occured\n\n"
return resultEr
return result
if __name__ == "__main__":
inputFile = sys.argv[1]
outputFile = sys.argv[2]
sc = SparkContext(appName = "SomeApp")
resultRDD = sc.wholeTextFiles(inputFile).map(processFiles)
resultRDD.saveAsTextFile(outputFile)
目錄中的每個文件的大小可以在我的情況非常大,因爲這個原因使用wholeTextFiles
API的將是在這種情況下,效率不高。有沒有有效的方法來做到這一點?我可以考慮逐個遍歷目錄中的每個文件,但這似乎效率不高。我是新來的火花。請讓我知道是否有任何有效的方法來做到這一點。
每個文件的大小有多大?你不能把文件分割成更小的文件嗎? –
@DatTran每個文件的大小可以是幾Gbs,並且目錄中的文件數量可以大於100.我認爲可以將文件拆分的一種方法是逐個拆分每個文件,並從每個文件中取出第一個拆分文件並保留這些文件在臨時目錄中。之後,我們可以在該臨時目錄上應用'wholeTextFiles'。這是你建議分割文件的方式嗎?如果沒有,請讓我知道你會建議如何拆分文件? – mcurious