1
我嘗試從(http://code.google.com/p/appengine-mapreduce/)的mapreduce框架並稍微修改了演示應用程序(使用mapreduce.input_readers.DatastoreInputReader而不是mapreduce.input_readers.BlobstoreZipInputReader)。如何在MapReduce中設置輸出寫入器
我已經設置了2管道類:
class IndexPipeline(base_handler.PipelineBase):
def run(self):
output = yield mapreduce_pipeline.MapreducePipeline(
"index",
"main.index_map", #added higher up in code
"main.index_reduce", #added higher up in code
"mapreduce.input_readers.DatastoreInputReader",
mapper_params={
"entity_kind": "model.SearchRecords",
},
shards=16)
yield StoreOutput("Index", output)
class StoreOutput(base_handler.PipelineBase):
def run(self, mr_type, encoded_key):
logging.info("output is %s %s" % (mr_type, str(encoded_key)))
if encoded_key:
key = db.Key(encoded=encoded_key)
m = db.get(key)
yield op.db.Put(m)
而且隨着運行:
pipeline = IndexPipeline()
pipeline.start()
但我不斷收到此錯誤:
Handler yielded two: ['a'] , but no output writer is set.
我試着在source的某處找到設置輸出寫入器的地方,但取得成功。我發現的唯一情況是應該在某個地方設置一個output_writer_class
。
有誰知道如何設置?
請注意,StoreOutput
中的參數encoded_key
總是顯示無。