1

我正在使用帶GAE的延遲任務隊列庫。每天我都需要發送一段文字給連接到我應用程序中某個頁面的所有用戶。我的應用程序連接了多個頁面,因此對於每個頁面,我想要遍歷所有用戶,並向他們發送每日消息。我使用遊標以800批次迭代用戶表。如果有800多個用戶,我想記住光標停留在哪裏,並與其他用戶開始另一個任務。如何確保所有用戶僅使用GAE和延期任務隊列發送每日消息

我只想確保使用我的算法,我將只向所有用戶發送一條消息。我想確保我不會錯過任何用戶,並且沒有任何用戶會收到相同的消息兩次。

這看起來像處理我的情況的正確算法?

def send_news(page_cursor=None, page_batch_size=1, 
       user_cursor=None, user_batch_size=800): 

    p_query = PageProfile.query(PageProfile.subscribed==True) 
    all_pages, next_page_cursor, page_more = p_query.fetch_page(page_batch_size, 
              start_cursor=page_cursor) 
    for page in all_pages: 
    if page.page_news_url and page.subscribed: 
     query = User.query(User.subscribed==True, User.page_id == page.page_id) 
     all_users, next_user_cursor, user_more = query.fetch_page(user_batch_size, start_cursor=user_cursor) 

     for user in all_users: 
     user.sendNews() 

     # If there are more users on this page, remember the cursor 
     # and get the next 800 users on this same page 
     if user_more: 
     deferred.defer(send_news, page_cursor=page_cursor, user_cursor=next_user_cursor) 

    # If there are more pages left, use another deferred queue to 
    # send the daily news to users in that page 
    if page_more: 
    deferred.defer(send_news, page_cursor=next_page_cursor) 

    return "OK" 

回答

1

你可以換你user.sendNews()與特定名稱的另一個遞延任務,這將確保它的創建只有一次。

interval = int(time.time())/(60 * 60 * 24) 

args = ('positional_arguments_for_object') 
kwargs = {'param': 'value'} 

task_name = '_'.join([ 
    'user_name', 
    'page_name' 
    str(interval_num) 
]) 
# with interval presented in the name we are sure that the task name for the same page and same user will stay same for 24 hours 

try: 
    deferred.defer(obj, _name=task_name, _queue='my-queue', _url='/_ah/queue/deferred', *args, **kwargs) 
except (taskqueue.TaskAlreadyExistsError): 
    pass 
    # task with such name already exists, likely wasn't executed yet 
except (taskqueue.TombstonedTaskError) 
    pass 
    # task with such name was created not long time ago and this name isn't available to use 
    # this should reset once a week or so 

需要注意的是,據我記得App Engine不保證該任務將只執行一次,在一些邊緣情況下,它可能會被執行兩次以上,理想應該是冪等。如果這種邊緣情況對您很重要 - 您可以事務性地在數據存儲中爲每個任務讀取/寫入一些標誌,並且在執行任務之前檢查該實體是否在那裏取消執行。

相關問題