我正在使用MRjob在我們的HBase實例上運行Hadoop Streaming作業。對於我的生活,我無法弄清楚如何將一個參數傳遞給我的reducer。我有兩個參數,我想從我運行作業時傳遞給reducer:startDate和endDate。這是我目前的減速機看起來像︰將參數傳遞給MRjob中的reducer
def reducer(self, groupId, meterList):
"""
Print bucket.
"""
sys.stderr.write("Working on group = " + str(groupId) + "\n")
#print "Opening connection..."
conn = open_connection(hostname)
#print "Getting table..."
table = get_table(conn, tableName)
compositeDf = DataFrame()
for meterId in meterList:
sys.stderr.write("Querying: " + str(meterId) + "\n")
df = extract_meter_data(table, meterId, startDate, endDate)
我似乎無法將startDate和endDate作爲參數傳遞給我的減速器。我可以通過一個全局變量來獲取參數。
startDate = datetime.datetime(2012, 6, 10)
endDate = datetime.datetime(2012, 6, 11)
class MRDataQuality(MRJob):
"""
MapReduce job that does a data quality check on the meter data in HBase.
"""
但這很髒。我想通過它來調用這個工作。我嘗試了很多方法。將它設置爲一個實例變量,將其設置爲一個靜態類變量,爲MRDataQualityJob創建一個重載的構造函數....似乎沒有任何工作。我從我的頂層腳本編程調用它像這樣:
if args.hadoop:
mrdq_job = MRDataQuality(args=['-r', 'hadoop', '--conf-path', 'mrjob.conf', '--jobconf', 'mapred.reduce.tasks=42', meterFile])
else:
mrdq_job = MRDataQuality(args=[meterFile])
with mrdq_job.make_runner() as runner:
runner.run()
無論我做什麼它似乎像runner.run(在mrdq_job實例)使用其沒有按類的新鮮的新實例沒有定義實例或靜態變量。我怎樣才能將我的參數傳遞給reducer?我可以在普通的Hadoop Streaming中通過傳遞一個字符串來實現:「--reducer reducer.py arg1 arg2」。 MRjob是否有任何等價物?
注意'get_jobconf_value'被貶值。 https://pythonhosted.org/mrjob/utils-compat.html#mrjob.compat.jobconf_from_env – dranxo