2010-05-31 24 views
0

我已經在我爲客戶託管的站點上設置了此任務隊列實現,它有一個cron job,它每天凌晨2點運行「/admin/tasks/queue」,這將排隊發送電子郵件,「/admin/tasks/email」,並使用cursors至於做小塊排隊。由於某種原因,昨晚/admin/tasks/queue不斷運行此代碼,因此發送了我的整個電子郵件:/的配額。我用這段代碼做錯了什麼?這個任務隊列設置有什麼問題?

class QueueUpEmail(webapp.RequestHandler): 
    def post(self): 
     subscribers = Subscriber.all() 
     subscribers.filter("verified =", True) 

     last_cursor = memcache.get('daily_email_cursor') 
     if last_cursor: 
      subscribers.with_cursor(last_cursor) 

     subs = subscribers.fetch(10) 
     logging.debug("POST - subs count = %i" % len(subs)) 
     if len(subs) < 10: 
      logging.debug("POST - Less than 10 subscribers in subs") 
      # Subscribers left is less than 10, don't reschedule the task 
      for sub in subs: 
       task = taskqueue.Task(url='/admin/tasks/email', params={'email': sub.emailaddress, 'day': sub.day_no}) 
       task.add("email") 
      memcache.delete('daily_email_cursor') 
     else: 
      logging.debug("POST - Greater than 10 subscibers left in subs - reschedule") 
      # Subscribers is 10 or greater, reschedule 
      for sub in subs: 
       task = taskqueue.Task(url='/admin/tasks/email', params={'email': sub.emailaddress, 'day': sub.day_no}) 
       task.add("email") 
      cursor = subscribers.cursor() 
      memcache.set('daily_email_cursor', cursor) 
      task = taskqueue.Task(url="/admin/tasks/queue", params={}) 
      task.add("queueup") 

回答

2

我可以看到一些潛在的問題。首先,您將光標存儲在內存緩存中,但不保證任何內容。如果您在處理中途錯過了緩存,則會再次發送每條消息。其次,如果任務失敗,任務將被重新嘗試;他們應該被設計成爲這個原因的冪等性。當然,在發送電子郵件的情況下,這幾乎是不可能的,因爲一旦發送了消息,如果您的任務在發送之後由於某種其他原因而死亡,則無法回滾。至少,我建議在發送消息後嘗試更新每個Subscriber實體上的「最後通過電子郵件發送的日期」字段。當然,這本身並不是萬無一失的,因爲電子郵件發送可能會成功,並且在此之後實體的更新可能會失敗。這也會增加整個過程的開銷,因爲你會爲每個用戶進行寫操作。

+0

感謝您的分析,我最初的想法是memcache可能是一個問題。 – 2010-05-31 20:12:13

+0

Memcache可能是您遇到問題背後的直接原因,是的。你最好的辦法是將光標作爲每個任務的參數傳遞給下一個任務。 – 2010-06-01 09:18:28