2

我已經創建了兩個MapReduce管道用於上傳CSV文件以批量創建類別和產品。每個產品都通過KeyProperty綁定到類別。類別和產品模型是基於ndb.Model構建的,所以根據文檔,我認爲從數據存儲庫中檢索時,它們會自動緩存在Memcache中。ndb使用MapReduce時,模型不會保存在內存緩存中

我已經在服務器上運行這些腳本來上傳30個類別,然後是3000個產品。所有數據按預期的方式出現在數據存儲區中。

但是,它似乎不像產品上傳使用Memcache來獲取類別。當我檢查門戶網站中的Memcache查看器時,它說明了命中計數大約爲180,並且計數錯誤數在60左右。如果我每次上傳3000個產品並檢索該類別,應該不會有大約3000個命中+錯過獲取類別(即Category.get_by_id(category_id))?在創建新的產品(算法可處理實體創建和更新)之前,嘗試檢索現有產品可能還會有3000多次。

這裏的相關產品映射功能,這需要從CSV文件中的行以創建或更新產品:

def product_bulk_import_map(data): 
    """Product Bulk Import map function.""" 

    result = {"status" : "CREATED"} 
    product_data = data 

    try: 
     # parse input parameter tuple 
     byteoffset, line_data = data 

     # parse base product data 
     product_data = [x for x in csv.reader([line_data])][0] 
     (p_id, c_id, p_type, p_description) = product_data 

     # process category 
     category = Category.get_by_id(c_id) 
     if category is None: 
      raise Exception(product_import_error_messages["category"] % c_id) 

     # store in datastore 
     product = Product.get_by_id(p_id) 
     if product is not None: 
      result["status"] = "UPDATED" 
      product.category = category.key 
      product.product_type = p_type 
      product.description = p_description 
     else: 
      product = Product(
       id = p_id, 
       category = category.key, 
       product_type = p_type, 
       description = p_description 
      ) 
     product.put() 
     result["entity"] = product.to_dict() 
    except Exception as e: 
     # catch any exceptions, and note failure in output 
     result["status"] = "FAILED" 
     result["entity"] = str(e) 

    # return results 
    yield (str(product_data), result) 
+0

您能否提供有關如何在memcache中存儲/獲取數據的信息?你使用什麼鍵?請記住,memcache不接受按鍵中的特殊符號(如空格)。 – 2014-10-07 12:52:44

+0

獲取通過.get_by_id()完成,並且通過.put()完成存儲。 類別ID是簡單的字符串(「書籍」,「電影」等)。產品ID目前只是數字(1,2,3 ...),但在發佈之前,我們可能會將其更改爲類別和數字(book_1,book_2,movie_1等)的組合。如果我需要改變,我可以,我只是想要一些相當簡單的東西,這將允許我們使用CSV導入來添加新條目並修改已過期信息或打字錯誤的舊條目。 – 2014-10-07 12:59:51

+0

我認爲這可能是由於內部導致的,上下文緩存,在memcache之前使用。您可以嘗試禁用它以查看是否看到大量的內存緩存命中。當然,上下文緩存比memcache效率更高 – 2014-10-08 06:13:11

回答

8

MapReduce的故意禁用內存緩存爲NDB。

mapreduce/util.py LN 373 _set_ndb_cache_policy()(截至2015年5月1日):

def _set_ndb_cache_policy(): 
    """Tell NDB to never cache anything in memcache or in-process. 

    This ensures that entities fetched from Datastore input_readers via NDB 
    will not bloat up the request memory size and Datastore Puts will avoid 
    doing calls to memcache. Without this you get soft memory limit exits, 
    which hurts overall throughput. 
    """ 
    ndb_ctx = ndb.get_context() 
    ndb_ctx.set_cache_policy(lambda key: False) 
    ndb_ctx.set_memcache_policy(lambda key: False) 

您可以強制get_by_id()put()使用內存緩存,如:

product = Product.get_by_id(p_id, use_memcache=True) 
... 
product.put(use_memcache=True) 

或者,您可以修改NDB上下文如果你正在批量放入mapreduce.operation。但是我不知道夠說,這是否有其他不期望的效果:

ndb_ctx = ndb.get_context() 
ndb_ctx.set_memcache_policy(lambda key: True) 
... 
yield operation.db.Put(product) 

至於有關「軟內存限制出口」的文檔字符串,我不明白爲什麼如果唯一的memcache是會發生啓用(即沒有上下文緩存)。

它實際上就像你想要 memcache被啓用put,否則你的app最終會在你的mapper修改了下面的數據後從NDB的memcache讀取陳舊的數據。

+0

需要注意的事項:使用mapreduce庫中提供的緩存策略,如果更新任何實體(例如,如果您正在進行遷移),則會完全跳過memcache ...這意味着如果實體已經在memcache中,並且使用mapreduce更新它,它將直接在數據存儲中更新,但之前的(陳舊的)副本仍將保存在memcache中,並通過任何後續的NDB獲取返回。你必須記得使用'.put(use_memcache = True)'來替換陳舊的副本,或者手動刪除memcache鍵 – 2016-08-04 18:42:15

1

由於Slawek Rewaj已經提到的,這是由在上下文緩存造成的。當檢索實體時,NDB首先嚐試上下文緩存,然後嘗試memcache,最後如果在上下文緩存和內存緩存中均未找到,則最終從數據存儲中檢索該實體。上下文緩存僅僅是一個Python字典,其生命週期和可見性僅限於當前請求,但MapReduce會在一個請求中對product_bulk_import_map()進行多次調用。

您可以找到有關在上下文緩存這裏的更多信息:https://cloud.google.com/appengine/docs/python/ndb/cache#incontext

+0

不是由上下文緩存引起的。請參閱下面的答案。 :) – Scotty 2015-05-01 02:45:07