2012-02-13 60 views
3

我有一個巨大的CSV文件,我想在Amazon EMR(python)上使用Hadoop MapReduce進行處理。在Python中使用Hadoop來處理一個大的csv文件

該文件有7個領域,但是,我只在看日期數量場。

"date" "receiptId" "productId" "quantity" "price" "posId" "cashierId" 

首先,我mapper.py

import sys 

def main(argv): 
    line = sys.stdin.readline() 
    try: 
     while line: 
      list = line.split('\t') 

      #If date meets criteria, add quantity to express key 
       if int(list[0][11:13])>=17 and int(list[0][11:13])<=19: 
        print '%s\t%s' % ("Express", int(list[3])) 
      #Else, add quantity to non-express key 
       else: 
        print '%s\t%s' % ("Non-express", int(list[3])) 

      line = sys.stdin.readline() 
except "end of file": 
     return None 
if __name__ == "__main__": 
     main(sys.argv) 

對於減速,我將使用流命令:集料。

問:

  1. 是我的代碼嗎?我在Amazon EMR中運行它,但是我得到了一個空輸出。

  2. 所以我最終的結果應該是:表達,XXX和非表達,YYY。在返回結果之前,我可以讓它做分割操作嗎?只是XXX/YYY的結果。我應該在哪裏放這個代碼?減速器??

  3. 此外,這是一個巨大的CSV文件,所以將映射分解成幾個分區?或者我需要顯式調用FileSplit?如果是這樣,我該怎麼做?

+0

爲什麼不使用python內置的csv解析器? – 2012-02-14 23:18:15

回答

3

在這裏回答我自己的問題!

  1. The code is wrong。如果您使用聚合庫進行縮減,則您的輸出不會遵循常用的鍵值對。它需要一個「前綴」。

    if int(list[0][11:13])>=17 and int(list[0][11:13])<=19: 
        #This is the correct way of printing for aggregate library 
        #Print all as a string. 
        print "LongValueSum:" + "Express" + "\t" + list[3] 
    

    其他的 「前綴」 可以是:DoubleValueSum,LongValueMax,LongValueMin,StringValueMax,StringValueMin,UniqValueCount,ValueHistogram。欲瞭解更多信息,請看這裏http://hadoop.apache.org/common/docs/r0.15.2/api/org/apache/hadoop/mapred/lib/aggregate/package-summary.html

  2. 是的,如果您想要做的不僅僅是基本總和,最小值,最大值或者計數,您需要編寫自己的reducer。

  3. 我還沒有答案。

+0

感謝您使用前綴「LongValueSum:」指針。 – Suman 2012-07-25 16:52:29

+0

@Deyang嗨,我是新來的hadoop -python。我也有類似的工作要做,但我在hadoop目錄中有多個csv文件,我已經編寫了在本地機器上正常運行的腳本。當我在羣集上運行它時,它會給出一個錯誤,因爲「Streaming Command Failed」。你能建議如何從hdfs目錄中讀取所有的csv文件。 – MegaBytes 2015-04-22 06:17:36