2011-06-14 24 views
1

我的數據存儲庫中有大量來自外部數據源的實體(產品)。我想每天檢查他們的更新。App Engine:檢查數據存儲區中數據更新,同時避免數據存儲區寫入的最佳方法

有些項目已經更新,因爲應用程序直接獲取它們。有些是新插入的,不需要更新。

對於那些沒有被抓取的我有cron作業正在運行。我使用Python API。

目前我做了以下工作。

我有一個字段

dateupdated = db.DateTimeProperty(auto_now_add=True) 

然後我就可以用

query = dbmodel.product.all() 
query.filter('dateupdated <', newdate) 
query.order('dateupdated')   
results = query.fetch(limit=mylimit, offset=myoffset) 

挑最早的條目,並安排它們進行更新。我將Task Queue與自定義任務名稱一起使用,以確保每次產品更新只能每天運行一次。

問題是,我需要更新字段dateupdated,這意味着數據存儲區寫入,即使產品的數據沒有改變,只是爲了跟蹤更新過程。

這消耗了大量的資源(CPU小時數,數據存儲API調用等)。

是否有更好的方式來執行這樣的任務並避免不必要的數據存儲寫入?

+0

我不是很確定你爲什麼要更新dateupdated字段?或者爲什麼你不存儲「我們運行支票的最後日期」並使用它而不是通過一些偏移量,但是無論哪種方式,我認爲遊標就是你要找的。 – 2011-06-14 13:14:00

+0

如果您每天運行一次查詢,則無需根據上次檢查時的情況進行篩選,是嗎?至少在一天前,所有產品都會在上次運行時進行檢查。無論如何,除非你處理數以百萬計的產品,否則在記錄中寫入幾個時間戳不太可能給CPU時間帶來很大的負擔。 – 2011-06-15 04:45:39

+0

@尼克約翰遜我過濾排除已被其他進程更新或新的項目。我也可以跳過這一點,因爲taskqueue不允許我每天安排多個產品更新(使用自定義名稱)。問題是:什麼更高效?無論如何,我的理解是,使用像'myproduct = product.get_by_key_name(productkey) myproduct.dateupdated = datetime.datetime.now() product.put(myproduct)'與存儲整個entitiy是一樣的嗎?還是你建議它只會更新更改的字段並消耗更少的CPU? – 2011-06-15 09:39:17

回答

1

是,使用cursors

通過由dateupdated訂購查詢,然後存儲光標你處理你的實體後,您可以稍後重新運行相同的查詢只讓你的最後一次查詢後更新的項目。

因此,考慮到像

class MyEntity(db.model): 
    dateupdated = db.DateTimeProperty(auto_now_add=True) 

,你可以設置一個處理一個類來運行像一個任務:

class ProcessNewEntities(webapp.RequestHandler): 
    def get(self): 
     """Run via a task to process batches of 'batch_size' 
     recently updated entities""" 
     # number of eneities to process per task execution 
     batch_size = 100 
     # build the basic query 
     q = MyEntity.all().order("dateupdated") 
     # use a cursor? 
     cursor = self.request.get("cursor") 
     if cursor: 
      q.with_cursor(cursor) 
     # fetch the batch 
     entities = q.fetch(batch_size) 
     for entity in entities: 
      # process the entity 
      do_your_processing(entity) 
     # queue up the next task to process the next 100 
     # if we have no more to process then delay this task 
     # for a while so that it doesn't hog the application 
     delay = 600 if len(entities)<batch_size else 0 
     taskqueue.add(
      url='/tasks/process_new_entities', 
      params={'cursor': q.cursor()}, 
      countdown=delay) 

,然後你只需要觸發任務執行的開始像:

def start_processing_entities(): 
    taskqueue.add(url='/tasks/process_new_entities') 
+0

謝謝你的詳細描述,看起來很有希望。我看到的唯一問題是,我無法像使用'query.filter('dateupdated <',newdate)'這樣的過濾器。但我相信有辦法解決這個問題。 – 2011-06-14 16:37:55

相關問題