2013-12-19 39 views
1

我想在mapreduce作業定稿/完成後執行自定義回調函數。GAE上的Mapreduce Python - 導致ReducePipeline在完成時發出回調?

我發現這個問題唯一有用的參考文獻是a somewhat outdated Google site和一個相關的,但看起來已經過時Stackoverflow question

這些來源假設我用​​揭開序幕MapReduce作業,並依賴於一個事實,即start_map需要一個關鍵字參數mapreduce_parameters,其中一個可以指定一個done_callback參數來指定要在完成被稱爲URL。但是,我用不同的方法(據我所知越近,首選1),其中一個自定義管道的run方法產生MapReduce的管道:

yield mapreduce_pipeline.MapreducePipeline(
    "word_count", 
    "main.word_count_map", 
    "main.word_count_reduce", 
    "mapreduce.input_readers.BlobstoreZipInputReader", 
    "mapreduce.output_writers.BlobstoreOutputWriter", 
    mapper_params={ 
     "blob_key": blobkey, 
    }, 
    reducer_params={ 
     "mime_type": "text/plain", 
    }, 
    shards=16) 

爲MapreducePipeline簽名不允許一個mapreduce_parameters說法。我可以看到在源代碼中回調出現的引用的唯一地方是mapper_pipeline.MapperPipeline.run,但它似乎只在內部使用。

那麼,有沒有辦法讓那裏的回調參數?

如果沒有,是否有人對在哪裏以及如何擴展庫提供這樣的功能有很好的想法?

回答

1

設置我的MapReduce管道範式看起來有點像下面這樣:

class MRRecalculateSupportsPipeline(base_handler.PipelineBase): 

    def run(self, user_key): 
     # ... 
     yield mapreduce_pipeline.MapreducePipeline('user_recalculate_supports', 
       'myapp.mapreduces.user_recalculate_supports_map', 
       'myapp.mapreduces.user_recalculate_supports_reduce', 
       'mapreduce.input_readers.DatastoreInputReader', output_writer_spec=None, 
       mapper_params={"""..."""}) 

如果你想捕捉這條管線的建成,你有兩個選擇。

A)使用管道。在MR管道完成後運行完成管道。 B)使用頂級管道的finalized方法來處理完成。使用頂級管道的finalized方法來處理完成。就我個人而言,我會堅持選項A,因爲您可以跟蹤/_ah/*/status?root=視圖中的路徑。

class EmailNewReleasePipeline(base_handler.PipelineBase): 
    """Email followers about a new release""" 
    # TODO: product_key is the name of the parameter, but it's built for albums ... 

    def run(self, product_key, testing=False): 
      # Send those emails ... 
      yield mapreduce_pipeline.MapreducePipeline(...) 

    def finalized(self): 
     """Save product as launched""" 
     ... 
     product.launched = True 
     product.put() 

以下是關於finalization of a pipeline的文檔。

+0

)你知道什麼是pipeline嗎?之後或爲什麼它是neceesay?根據我的經驗,簡單地產生後處理管道將執行它的代碼在主苯胺完成後。這也是如何檢索[GAE MR Hello World應用](https://developers.google.com/appengine/docs/python/dataprocessing/helloworld)中的輸出文件blobkey,所以我沒有想到它。 – Alice

+0

pipeline.After需要確保生成的管道的正確執行順序。到目前爲止,我最喜歡的管道功能之一是,在文檔中:https://code.google.com/p/appengine-pipeline/wiki/GettingStarted#Execution_ordering – Josh

+0

明白了。謝謝。 – Alice

0

對於這個問題,至少一個不那麼多的投資解決方法是簡單地產生另一個執行所需後處理的Map/Mapreduce管道。

例如爲:

class MainPipeline(base_handler.PipelineBase): 
    def run(self): 
     mapper_params = { ... } 
     reducer_params = { ... } 
     yield mapreduce_pipeline.MapReducePipeline(
      ..., 
      mapper_params=mapper_params, 
      reducer_params=reducer_params) 
     yield PostprocessPipeline(reducer_params) 


class PostprocessPipeline(base_handler.PipelineBase): 
    def run(self, reducer_params): 
     do_some_postprocessing(reducer_params) 

那解決方法不能訪問到MapReduce的狀態,我想可以以某種方式從管道ID檢索到的,但它尚未明顯對我怎麼樣。因此,您必須設置另一個標誌/ memcache/ds條目來檢查管道是否成功完成(如果這與後處理有關)。

+0

爲此目的使用memcache是​​令人擔憂的 - 從不保證結果可用。但是,我同意產生另一條管線的想法。上面肯定缺少的一件事就是使用'pipeline.After((yield yield mapreduce_pipeline.MapReducePipeline ...' – Josh

相關問題