App Engine Mapreduce API是否根據最終reduce工作中的邏輯決定計算分片大小?設置App Engine mapreduce分片大小
我正在使用App Engine mapreduce API並提供了kwarg來設置我的mapreduce分片大小。
在我的mapreduce作業中,分片大小尤其重要,因爲我不想將太多結果批量分配給執行reduce函數最後一步的任何結果。換句話說,我正在硬編碼碎片大小,以根據系統上的外部約束將用戶平均分配。
地圖作業似乎分裂得很好,但減速器只使用我指定的一小部分碎片。
這裏是排序的代碼我處理的粗略輪廓:
SHARD_SIZE = 42
def map_fun(entity):
shard_key = random.randint(1, SHARD_SIZE)
yield (
shard_key,
db.model_to_protobuf(entity).SerializeToString().encode('base64')
)
def reduce_fun(key, entities):
batch = []
for entity in entities:
#check for stuff
batch.append(entity)
expensive_side_effect(batch)
class MyGreatPipeline(base_handler.PipelineBase):
def run(self, *args, **kw):
yield mapreduce_pipeline.MapreducePipeline(
'label'
'path.to.map_fun',
'path.to.reduce_fun',
'mapreduce.input_readers.DatastoreInputReader',
'mapreduce.output_writers.BlobstoreOutputWriter',
mapper_params={
'entity_kind': 'path.to.entity',
'queue_name': 'coolQueue'
},
reducer_params={},
shard_size = SHARD_SIZE
)
map_fun
特別指定每個實體對根據碎片大小隨機確定一個碎片。我很困惑,爲什麼我的reducer將會有比SHARD_SIZE
更少的碎片,因爲有很多實體,並且極不可能重複選擇相同的整數。