2017-05-08 60 views
0

我正在使用wholeTextFiles來讀取目錄中的每個文件。之後,我使用map在rdd的每個元素上調用一個函數。整個程序僅使用每個文件的50行。代碼如下:apache spark:從目錄中讀取大尺寸文件

def processFiles(fileNameContentsPair): 
    fileName= fileNameContentsPair[0] 
    result = "\n\n"+fileName 
    resultEr = "\n\n"+fileName 
    input = StringIO.StringIO(fileNameContentsPair[1]) 
    reader = csv.reader(input,strict=True) 

    try: 
     i=0 
     for row in reader: 
     if i==50: 
      break 
     // do some processing and get result string 
     i=i+1 
    except csv.Error as e: 
    resultEr = resultEr +"error occured\n\n" 
    return resultEr 
    return result 



if __name__ == "__main__": 
    inputFile = sys.argv[1] 
    outputFile = sys.argv[2] 
    sc = SparkContext(appName = "SomeApp") 
    resultRDD = sc.wholeTextFiles(inputFile).map(processFiles) 
    resultRDD.saveAsTextFile(outputFile) 

目錄中的每個文件的大小可以在我的情況非常大,因爲這個原因使用wholeTextFiles API的將是在這種情況下,效率不高。有沒有有效的方法來做到這一點?我可以考慮逐個遍歷目錄中的每個文件,但這似乎效率不高。我是新來的火花。請讓我知道是否有任何有效的方法來做到這一點。

+1

每個文件的大小有多大?你不能把文件分割成更小的文件嗎? –

+0

@DatTran每個文件的大小可以是幾Gbs,並且目錄中的文件數量可以大於100.我認爲可以將文件拆分的一種方法是逐個拆分每個文件,並從每個文件中取出第一個拆分文件並保留這些文件在臨時目錄中。之後,我們可以在該臨時目錄上應用'wholeTextFiles'。這是你建議分割文件的方式嗎?如果沒有,請讓我知道你會建議如何拆分文件? – mcurious

回答

1

好吧,我建議將您的文件先拆分成更小的塊,幾個千兆字節太大而無法讀取,這是您延遲的主要原因。如果你的數據在HDFS上,你可以爲每個文件提供64MB的內容。否則,您應該嘗試使用文件大小,因爲它取決於您擁有的執行程序的數量。所以如果你有更多的小塊,你可以增加這個來獲得更多的並行性。同樣,你也可以增加你的分區來調整它,因爲你的processFiles函數似乎不是CPU密集型的。許多執行程序唯一的問題是I/O增加,但是如果文件很小,應該不是什麼大問題。

順便說一下,不需要臨時目錄,wholeTextFiles支持像*這樣的通配符。另外請注意,如果您將S3用作文件系統,那麼如果您的小文件過多,則可能會出現瓶頸,因爲讀取可能需要一段時間而不是大文件。所以這不是微不足道的。

希望這會有所幫助!