2012-06-29 57 views
7

我需要運行一個動態的mapreduce作業,因爲每次運行mapreduce作業(例如,響應用戶請求)時都需要將參數傳遞到地圖並減少函數。如何動態地將參數傳遞給GAE mapreduce上的映射函數?

我該如何做到這一點?我無法在文檔中的任何地方看到如何在運行時爲map和reduce執行動態處理。

class MatchProcessing(webapp2.RequestHandler): 

    def get(self): 
     requestKeyID=int(self.request.get('riderbeeRequestID')) 
     userKey=self.request.get('userKey') 
     pipeline = MatchingPipeline(requestKeyID, userKey) 
     pipeline.start() 
     self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id) 


class MatchingPipeline(base_handler.PipelineBase): 
    def run(self, requestKeyID, userKey): 
     yield mapreduce_pipeline.MapreducePipeline(
      "riderbee_matching", 
      "tasks.matchingMR.riderbee_map", 
      "tasks.matchingMR.riderbee_reduce", 
      "mapreduce.input_readers.DatastoreInputReader", 
      "mapreduce.output_writers.BlobstoreOutputWriter", 
      mapper_params={ 
       "entity_kind": "models.rides.RiderbeeRequest", 
       "requestKeyID": requestKeyID, 
       "userKey": userKey, 
      }, 
      reducer_params={ 
       "mime_type": "text/plain", 
      }, 
      shards=16) 


def riderbee_map(riderbeeRequest): 
    # would like to access the requestKeyID and userKey parameters that were passed in mapper_params 
    # so that we can do some processing based on that 

    yield (riderbeeRequest.user.email, riderbeeRequest.key().id()) 


def riderbee_reduce(key, values): 
    # would like to access the requestKeyID and userKey parameters that were passed earlier, perhaps through reducer_params 
    # so that we can do some processing based on that 

    yield "%s: %s\n" % (key, len(values)) 

請幫忙嗎?

+0

FYI ...這裏是如何將數據發送到工作中爪哇 - HTTP:// WWW。 thecloudavenue.com/2011/11/passing-parameters-to-mappers-and.html –

+0

嗯。你給的鏈接指向Hadoop的東西。這是GAE MapReduce ... –

回答

5

我敢肯定你可以在mapper_parameters中指定參數,並從上下文模塊中讀取它們。有關更多詳細信息,請參見http://code.google.com/p/appengine-mapreduce/wiki/UserGuidePython#Mapper_parameters

+0

除了在鏈接中引用這些參數不是動態的(即,不在運行時以編程方式傳遞),而是從靜態的mapreduce.yaml中讀取它們。也就是說,除非我誤解了這將如何工作 –

+0

在上面使用的代碼中,MapreducePipeline構造函數有一個mapper_params參數;還有一個mapreduce_parameters參數。這些相當於來自yaml文件的參數。請參閱此處的代碼:http://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/control.py –

4

這是如何從映射函數訪問映射器參數,使用上下文模塊:

from mapreduce import context 

def riderbee_map(riderbeeRequest): 
    ctx = context.get() 
    params = ctx.mapreduce_spec.mapper.params 
    requestKeyID = params["requestKeyID"] 
相關問題