3

我有在多種類型的資源上處理的sidekiq作業。但是,對於特定類型的資源,例如:資源X,我需要確保在任何給定時間只有一個sidekiq作業可以處理該特定資源。例如,如果我有3個sidekiq作業同時排隊,並且想要與資源X進行交互,那麼只有一個sidekiq作業可以處理資源X,而其餘的兩個作業將不得不等待(或重新排隊) ),直到當前正在處理資源的sidekiq作業完成。當現有的sidekiq作業正在處理特定的資源時,阻止/重新排隊處理其他sidekiq作業

當前,我試圖在數據庫表中添加一條記錄,以便在sidekiq作業正在處理資源時使用該記錄來阻止其他sidekiq作業處理資源,直到該記錄被sidekiq作業從數據庫中刪除(當它完成處理資源X時)或經過一段時間後(例如:如果記錄創建超過5分鐘前​​,則認爲它不再擁有對資源X的獨佔訪問權,並且下一個sidekiq想要處理資源X的作業可能會改變該記錄並要求獨佔訪問資源X)。

我當前實現的僞代碼:

def perform(res_id, res_type) 

    # Only applies to "RESOURCE_X" 
    if res_type == RESOURCE_X 
    if ResourceProcessor.where(res_id).empty? || ((Time.now-ResourceProcessor.where(res_id).first.created_at) > 5.minutes) 
     ResourceProcessor.create(res_id: res_id).save 
     process_resource_x(res_id) 
    else 
     SidekiqWorker.delayed(res_id, res_type, 5.minutes) #Try again later 
     return 
    end 

    #Letting other sidekiq jobs know they can now fight over who gets to process resource X 
    ResourceProcessor.where(res_id).destroy 

    else 
    process_other_resource(res_id) 
    end 

end 

不幸的是,我的解決辦法是行不通的。如果希望處理資源X的sidekiq作業之間存在延遲,則工作得很好。但是,如果要處理資源X的作業同時到達,那麼我的解決方案就會崩潰。

有沒有什麼辦法可以在處理資源X時執行某種同步?

順便說一句,我的sidekiq作業可能分佈在多臺機器上(但他們訪問專用機器上的同一個redis服務器)。

+2

您可能正在尋找一個**鎖**,以便一次只有一個線程/進程修改資源。這裏有一個很好的閱讀,建議使用數據庫來協調鎖定:https://makandracards.com/makandra/31937-differences-between-transactions-and-locking。在[with_advisory_lock gem](https://github.com/mceachen/with_advisory_lock)中實現了一些更多的鎖定策略。 –

回答

0

我根據Thomas提供的評論做了更多的研究。

他提供的鏈接非常有用。他們實現了自己的自定義Lock類,以實現他們想要的結果。但是,我沒有使用自定義鎖定代碼,因爲我需要不同的行爲。

我期望實現的具體行爲是「重新排隊如果鎖定」而不是「等待鎖定」。

我還可以使用其他替代工具,例如redis-semaphorewith_advisory_gem。 我測試了redis-semaphore並發現它有bug。它沒有正確返回鎖狀態和資源計數。此外,在檢查Github上的問題之後,在某些情況下,redis-semaphore可能陷入自己的僵局,所以我決定放棄使用它。因此,我還決定不使用with_advisory_gem,因爲它的星號低於redis-semaphore。

最後我發現了一種方法來實現我的問題中描述的鎖定模式,即根據數據庫中的值阻止sidekiq作業。我通過鎖定整個數據庫行,使用rail自​​己的Locking-pessimistic類來處理讀取陳舊值的多個sidekiq作業的併發問題。這確保了只有1個sidekiq worker可以訪問在任何給定時間保存鎖定值的數據庫行。鎖定時間保持最小值,因爲只有讀取和適用時才執行寫入操作,同時鎖定數據庫行。之後的後續操作,例如進行請求和清理。