2012-11-20 33 views
0

我在表單數據調試Hadoop的流編程'

id, movieid  , date, time 
3710100, 13502, 2012-09-10, 12:39:38.000 

現在基本上是我想要做的就是這個..

我想找出,一個特定的電影多少次7之間觀看上午11時,在30分鐘的時間間隔

所以基本上..

多少次電影已

之間觀看
6 and 6:30 
    6:30 and 7 
    7 and 7:30 
    ... 
    10:30-11 

所以我寫了mapper和reducer來實現這個。

mapper.py 

#!/usr/bin/env python 

import sys 

# input comes from STDIN (standard input) 
for line in sys.stdin: 
    # remove leading and trailing whitespace 
    line = line.strip() 
    # split the line into words 
    line = line.split(",") 
    #print line 

    print '%s\t%s' % (line[1], line) 

reducer.py

#!/usr/bin/env python 

import sys 
import datetime 
from collections import defaultdict 



def convert_str_to_date(time_str): 
    try: 
     timestamp = datetime.datetime.strptime(time_str, '%Y-%m-%d:%H:%M:%S.000') #00:23:51.000 


     return timestamp 

    except Exception,inst: 

     pass 

def is_between(time, time1,time2): 
    return True if time1 <= time < time2 else False 


def increment_dict(data_dict, se10,date_time): 
    start_time = datetime.datetime(date_time.year,date_time.month,date_time.day, 07,00,00) 
    times = [start_time] 
    for i in range(8): 
     start_time += datetime.timedelta(minutes = 30) 
     times.append(start_time) 
    for i in range(len(times) -1): 
     if is_between(date_time, times[i], times[i+1]): 
      data_dict[se10][i] += 1 






keys = [0,1,2,3,4,5,6,7] 



data_dict = defaultdict(dict) 


# input comes from STDIN 
def initialize_entry(se10): 
    for key in keys: 
     data_dict[se10][key] = 0 

for line in sys.stdin: 
    # remove leading and trailing whitespace 
    line = line.strip() 


    # parse the input we got from mapper.py 

    se10, orig_data = line.split('\t') 
    initialize_entry(se10) 
    parse_line = orig_data.split(",") 

    datestr = parse_line[2].replace(" ","").replace("'","") 
    timestr = parse_line[3].replace(" ","").replace("'","") 

    date_time = datestr + ":" + timestr 

    time_stamp = convert_str_to_date(date_time) 

    increment_dict(data_dict, se10,time_stamp) 


for key, secondary_key in data_dict.items(): 
    for skey, freq in secondary_key.items(): 
     print key,"," ,skey,",",freq 

上面的代碼,如果我做

cat input.txt | python mapper.py | sort | python reducer.py 

但是當我部署在集羣上運行良好。它沒有說這個工作已經被殺死了,這個原因是不知道的。

請幫忙。

謝謝。

回答

0

好吧,我想通了這件事了..

的主要問題是,我的工作的本地機器是基於Windows ..而集羣是基於linux ..

,所以我不得不寫的文件轉換在DOS下UNIX格式..

0

它通常是通過JobHistory日誌,改爲在https://stackoverflow.com/a/24509826/1237813描述的好主意。它應該給你更多的細節爲什麼工作失敗。

關於行尾時,類Hadoop的流使用由默認分割線是TextInputFormat。它與Windows換行符used to break,但自2006年以來它應該工作得很好。

這會讓您的映射器和reducer腳本成爲問題的可能來源。 Python 3使用了一種叫做universal newlines的東西,它應該只用Unix和Windows換行符就可以工作。在Python 2.7中,你需要明確地打開它。

在Linux和Mac OS X您可以重新打開像這樣sys.stdin = open('/dev/stdin', 'U')啓用通用換行符標準輸入。我手邊沒有Windows電腦試用,但以下內容應適用於所有三個系統:

import os 
import sys 

# reopen sys.stdin 
os.fdopen(sys.stdin.fileno(), 'U') 

for line in sys.stdin: 
    …