我需要運行一個動態的mapreduce作業,因爲每次運行mapreduce作業(例如,響應用戶請求)時都需要將參數傳遞到地圖並減少函數。如何動態地將參數傳遞給GAE mapreduce上的映射函數?
我該如何做到這一點?我無法在文檔中的任何地方看到如何在運行時爲map和reduce執行動態處理。
class MatchProcessing(webapp2.RequestHandler):
def get(self):
requestKeyID=int(self.request.get('riderbeeRequestID'))
userKey=self.request.get('userKey')
pipeline = MatchingPipeline(requestKeyID, userKey)
pipeline.start()
self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id)
class MatchingPipeline(base_handler.PipelineBase):
def run(self, requestKeyID, userKey):
yield mapreduce_pipeline.MapreducePipeline(
"riderbee_matching",
"tasks.matchingMR.riderbee_map",
"tasks.matchingMR.riderbee_reduce",
"mapreduce.input_readers.DatastoreInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"entity_kind": "models.rides.RiderbeeRequest",
"requestKeyID": requestKeyID,
"userKey": userKey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=16)
def riderbee_map(riderbeeRequest):
# would like to access the requestKeyID and userKey parameters that were passed in mapper_params
# so that we can do some processing based on that
yield (riderbeeRequest.user.email, riderbeeRequest.key().id())
def riderbee_reduce(key, values):
# would like to access the requestKeyID and userKey parameters that were passed earlier, perhaps through reducer_params
# so that we can do some processing based on that
yield "%s: %s\n" % (key, len(values))
請幫忙嗎?
FYI ...這裏是如何將數據發送到工作中爪哇 - HTTP:// WWW。 thecloudavenue.com/2011/11/passing-parameters-to-mappers-and.html –
嗯。你給的鏈接指向Hadoop的東西。這是GAE MapReduce ... –