2013-08-01 41 views
3

我正在使用MRjob在我們的HBase實例上運行Hadoop Streaming作業。對於我的生活,我無法弄清楚如何將一個參數傳遞給我的reducer。我有兩個參數,我想從我運行作業時傳遞給reducer:startDate和endDate。這是我目前的減速機看起來像︰將參數傳遞給MRjob中的reducer

def reducer(self, groupId, meterList): 
    """ 
    Print bucket. 
    """ 
    sys.stderr.write("Working on group = " + str(groupId) + "\n") 
    #print "Opening connection..." 
    conn = open_connection(hostname) 
    #print "Getting table..." 
    table = get_table(conn, tableName) 

    compositeDf = DataFrame() 

    for meterId in meterList: 
     sys.stderr.write("Querying: " + str(meterId) + "\n") 
     df = extract_meter_data(table, meterId, startDate, endDate) 

我似乎無法將startDate和endDate作爲參數傳遞給我的減速器。我可以通過一個全局變量來獲取參數。

startDate = datetime.datetime(2012, 6, 10) 
endDate = datetime.datetime(2012, 6, 11) 

class MRDataQuality(MRJob): 
    """ 
    MapReduce job that does a data quality check on the meter data in HBase. 
    """ 

但這很髒。我想通過它來調用這個工作。我嘗試了很多方法。將它設置爲一個實例變量,將其設置爲一個靜態類變量,爲MRDataQualityJob創建一個重載的構造函數....似乎沒有任何工作。我從我的頂層腳本編程調用它像這樣:

if args.hadoop: 
    mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', meterFile]) 
else: 
    mrdq_job = MRDataQuality(args=[meterFile]) 

with mrdq_job.make_runner() as runner: 
    runner.run() 

無論我做什麼它似乎像runner.run(在mrdq_job實例)使用其沒有按類的新鮮的新實例沒有定義實例或靜態變量。我怎樣才能將我的參數傳遞給reducer?我可以在普通的Hadoop Streaming中通過傳遞一個字符串來實現:「--reducer reducer.py arg1 arg2」。 MRjob是否有任何等價物?

回答

3

如何將您的參數傳遞給作業配置,然後使用get_jobconf_value讀取它們?

事情是這樣的:

from mrjob.compat import get_jobconf_value 

class MRDataQuality(MRJob): 

    def reducer(self, groupId, meterList): 
    ... 
    startDate = get_jobconf_value("my.job.settings.startdate") 
    endDate = get_jobconf_value("my.job.settings.enddate") 

    for meterId in meterList: 
     sys.stderr.write("Querying: " + str(meterId) + "\n") 
     df = extract_meter_data(table, meterId, startDate, endDate)  

,然後設置參數代碼像你上面

mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', '--jobconf', 'my.job.settings.startdate=2013-06-10', '--jobconf', 'my.job.settings.enddate=2013-06-11', meterFile]) 
+1

注意'get_jobconf_value'被貶值。 https://pythonhosted.org/mrjob/utils-compat.html#mrjob.compat.jobconf_from_env – dranxo

1

做怎麼樣和get_jobconf_value傳遞參數到工作配置,然後讀取它們的內reducer_init?這樣你只需要一次讀取參數。

事情是這樣的:

from mrjob.compat import get_jobconf_value 

class MRDataQuality(MRJob): 

    def reducer_init(self): 
    ... 
    self.startDate = get_jobconf_value("my.job.settings.startdate") 
    self.endDate = get_jobconf_value("my.job.settings.enddate") 

    def reducer(self, groupId, meterList): 
    for meterId in meterList: 
     sys.stderr.write("Querying: " + str(meterId) + "\n") 
     df = extract_meter_data(table, meterId, self.startDate, self.endDate)  

,然後設置參數,在代碼中像你這樣上述

mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', '--jobconf', 'my.job.settings.startdate=2013-06-10', '--jobconf', 'my.job.settings.enddate=2013-06-11', meterFile])