2012-06-19 37 views
2

App Engine Mapreduce API是否根據最終reduce工作中的邏輯決定計算分片大小?設置App Engine mapreduce分片大小

我正在使用App Engine mapreduce API並提供了kwarg來設置我的mapreduce分片大小。

在我的mapreduce作業中,分片大小尤其重要,因爲我不想將太多結果批量分配給執行reduce函數最後一步的任何結果。換句話說,我正在硬編碼碎片大小,以根據系統上的外部約束將用戶平均分配。

地圖作業似乎分裂得很好,但減速器只使用我指定的一小部分碎片。

這裏是排序的代碼我處理的粗略輪廓:

SHARD_SIZE = 42 

def map_fun(entity): 
    shard_key = random.randint(1, SHARD_SIZE) 
    yield (
    shard_key, 
    db.model_to_protobuf(entity).SerializeToString().encode('base64') 
) 

def reduce_fun(key, entities): 
    batch = [] 
    for entity in entities: 
    #check for stuff 
    batch.append(entity) 
    expensive_side_effect(batch) 


class MyGreatPipeline(base_handler.PipelineBase): 
    def run(self, *args, **kw): 
    yield mapreduce_pipeline.MapreducePipeline(
     'label' 
     'path.to.map_fun', 
     'path.to.reduce_fun', 
     'mapreduce.input_readers.DatastoreInputReader', 
     'mapreduce.output_writers.BlobstoreOutputWriter', 
     mapper_params={ 
     'entity_kind': 'path.to.entity', 
     'queue_name': 'coolQueue' 
     }, 
     reducer_params={}, 
     shard_size = SHARD_SIZE 
    ) 

map_fun特別指定每個實體對根據碎片大小隨機確定一個碎片。我很困惑,爲什麼我的reducer將會有比SHARD_SIZE更少的碎片,因爲有很多實體,並且極不可能重複選擇相同的整數。

回答

0

我很困惑你在這裏做什麼。使用映射階段將東西分組到一個小的分片密鑰中,稍後在減少的時間處理這些密鑰看起來很奇怪。即使你做了很多減少工作人員的工作,但是你做的每個關鍵都會有太多的工作要做。

正在處理的'批'是隨機任意的,所以我假設expensive_side_effect()不依賴於該批的內容。爲什麼不在地圖時間做這項工作,發佈一個減少可以傳遞給輸出編寫器的東西?

相關問題