2012-02-12 81 views
8

我正在研究appengine-mapreduce函數,並修改了演示以適合我的目的。 基本上,我有以下格式的行數百萬:userid,time1,time2。我的目的是爲每個用戶標識找出time1和time2之間的區別。用appengine-mapreduce命中內存限制

然而,正如我在谷歌應用程序引擎運行此,我遇到在日誌部分此錯誤消息:

超出軟專用空間限制與180.56 MB服務130個請求後的總 雖然處理這個請求,處理這個請求的進程被發現使用了太多的內存並被終止。這很可能會導致下一個請求應用程序使用新的進程。如果您經常看到此消息,那麼您的應用程序中可能會有內存泄漏。

def time_count_map(data): 
    """Time count map function.""" 
    (entry, text_fn) = data 
    text = text_fn() 

    try: 
    q = text.split('\n') 
    for m in q: 
     reader = csv.reader([m.replace('\0', '')], skipinitialspace=True) 
     for s in reader: 
      """Calculate time elapsed""" 
      sdw = s[1] 
      start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") 
      edw = s[2] 
      end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") 
      time_difference = time.mktime(end_date) - time.mktime(start_date) 
      yield (s[0], time_difference) 
    except IndexError, e: 
    logging.debug(e) 


def time_count_reduce(key, values): 
    """Time count reduce function.""" 
    time = 0.0 
    for subtime in values: 
    time += float(subtime) 
    realtime = int(time) 
    yield "%s: %d\n" % (key, realtime) 

任何人都可以建議我怎麼回事,可以優化我的代碼更好?謝謝!!

編輯:

這裏的管線處理程序:

class TimeCountPipeline(base_handler.PipelineBase): 
    """A pipeline to run Time count demo. 

    Args: 
    blobkey: blobkey to process as string. Should be a zip archive with 
     text files inside. 
    """ 

    def run(self, filekey, blobkey): 
    logging.debug("filename is %s" % filekey) 
    output = yield mapreduce_pipeline.MapreducePipeline(
     "time_count", 
     "main.time_count_map", 
     "main.time_count_reduce", 
     "mapreduce.input_readers.BlobstoreZipInputReader", 
     "mapreduce.output_writers.BlobstoreOutputWriter", 
     mapper_params={ 
      "blob_key": blobkey, 
     }, 
     reducer_params={ 
      "mime_type": "text/plain", 
     }, 
     shards=32) 
    yield StoreOutput("TimeCount", filekey, output) 

Mapreduce.yaml:

mapreduce: 
- name: Make messages lowercase 
    params: 
    - name: done_callback 
    value: /done 
    mapper: 
    handler: main.lower_case_posts 
    input_reader: mapreduce.input_readers.DatastoreInputReader 
    params: 
    - name: entity_kind 
     default: main.Post 
    - name: processing_rate 
     default: 100 
    - name: shard_count 
     default: 4 
- name: Make messages upper case 
    params: 
    - name: done_callback 
    value: /done 
    mapper: 
    handler: main.upper_case_posts 
    input_reader: mapreduce.input_readers.DatastoreInputReader 
    params: 
    - name: entity_kind 
     default: main.Post 
    - name: processing_rate 
     default: 100 
    - name: shard_count 
     default: 4 

文件的其餘部分是完全一樣的演示。

我上傳的代碼我複製上的Dropbox:http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip

+0

你可以顯示你的mapreduce配置嗎?出於某種原因,它看起來像將整個文件傳遞給映射器,而不是逐行映射它。 – 2012-02-12 18:45:09

+0

嗨丹尼爾,我的問題已被編輯。謝謝,真的很感激! – autumngard 2012-02-13 00:42:00

回答

2

很可能你的輸入文件的大小超過了軟內存限制。對於大文件,請使用BlobstoreLineInputReaderBlobstoreZipLineInputReader

這些輸入閱讀器將不同的東西傳遞給map函數,它們傳遞文件中的start_position和文本行。

map功能可能看起來像:

def time_count_map(data): 
    """Time count map function.""" 
    text = data[1] 

    try: 
     reader = csv.reader([text.replace('\0', '')], skipinitialspace=True) 
     for s in reader: 
      """Calculate time elapsed""" 
      sdw = s[1] 
      start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") 
      edw = s[2] 
      end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") 
      time_difference = time.mktime(end_date) - time.mktime(start_date) 
      yield (s[0], time_difference) 
    except IndexError, e: 
     logging.debug(e) 

使用BlobstoreLineInputReader將允許作業更快,因爲它可以使用一個以上的碎片,最多256個運行,但它意味着你需要上傳你的文件未壓縮,這可能是一個痛苦。我通過將壓縮文件上傳到EC2 windows服務器來處理它,然後從那裏解壓並上傳,因爲上行帶寬非常大。

+0

這對我來說非常好!非常感謝! :) – autumngard 2012-02-13 07:11:24

6

還可以考慮在代碼期間的常規點調用gc.collect()。我看到過幾個SO問題,關於通過調用gc.collect()緩解的超限軟內存限制,大部分都與blobstore有關。

+0

調用gc.collect()只適用於blobstore或一般? – marcadian 2014-07-07 19:58:13