我正在研究appengine-mapreduce函數,並修改了演示以適合我的目的。 基本上,我有以下格式的行數百萬:userid,time1,time2。我的目的是爲每個用戶標識找出time1和time2之間的區別。用appengine-mapreduce命中內存限制
然而,正如我在谷歌應用程序引擎運行此,我遇到在日誌部分此錯誤消息:
超出軟專用空間限制與180.56 MB服務130個請求後的總 雖然處理這個請求,處理這個請求的進程被發現使用了太多的內存並被終止。這很可能會導致下一個請求應用程序使用新的進程。如果您經常看到此消息,那麼您的應用程序中可能會有內存泄漏。
def time_count_map(data):
"""Time count map function."""
(entry, text_fn) = data
text = text_fn()
try:
q = text.split('\n')
for m in q:
reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
def time_count_reduce(key, values):
"""Time count reduce function."""
time = 0.0
for subtime in values:
time += float(subtime)
realtime = int(time)
yield "%s: %d\n" % (key, realtime)
任何人都可以建議我怎麼回事,可以優化我的代碼更好?謝謝!!
編輯:
這裏的管線處理程序:
class TimeCountPipeline(base_handler.PipelineBase):
"""A pipeline to run Time count demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"time_count",
"main.time_count_map",
"main.time_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_key": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=32)
yield StoreOutput("TimeCount", filekey, output)
Mapreduce.yaml:
mapreduce:
- name: Make messages lowercase
params:
- name: done_callback
value: /done
mapper:
handler: main.lower_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
- name: Make messages upper case
params:
- name: done_callback
value: /done
mapper:
handler: main.upper_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
文件的其餘部分是完全一樣的演示。
我上傳的代碼我複製上的Dropbox:http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip
你可以顯示你的mapreduce配置嗎?出於某種原因,它看起來像將整個文件傳遞給映射器,而不是逐行映射它。 – 2012-02-12 18:45:09
嗨丹尼爾,我的問題已被編輯。謝謝,真的很感激! – autumngard 2012-02-13 00:42:00