2014-07-05 42 views
2

請參閱下面更新的問題。Sidekiq處理大數據時處理重新排隊

原題:

在我目前的Rails項目,我需要解析大型XML/CSV數據文件,並將其保存到MongoDB的。 現在我用這個步驟:

  1. 從用戶接收上傳的文件,將數據存儲到MongoDB的
  2. 使用sidekiq執行MongoDB中數據的異步處理。
  3. 處理完成後,刪除原始數據。

對於localhost中的中小型數據,上述步驟運行良好。但是在heroku中,我使用hirefire來動態調整工人動態範圍。當工作人員仍在處理大量數據時,hirefire會查看空隊列並縮小工人動態範圍。這會向進程發送kill信號,並使進程處於不完整狀態。

我正在尋找一種更好的方法來進行解析,允許解析過程在任何時候都被殺死(在接收到kill信號時保存當前狀態),並允許進程重新排隊。

現在我正在使用Model.delay.parse_file,它不會重新排隊。

UPDATE

閱讀sidekiq維基之後,我發現article about job control。任何人都可以解釋代碼,它是如何工作的,以及在接收到SIGTERM信號並且工人重新排隊時如何保持它的狀態?

是否有任何替代方法來處理作業終止,保存當前狀態,並從最後位置繼續?

感謝,

回答

6

可能會更容易解釋的進程和高級別步驟,得到樣品的實現(一個,我使用的是簡化版),然後再談拋出和捕獲:

  1. 插入有一個遞增的指數原始的CSV行(以便能夠從一個特定的行/指數稍後恢復)
  2. 過程中的CSV阻止每一個「塊」來檢查工作,通過檢查做,如果Sidekiq::Fetcher.done?返回true
  3. 當提取器是done?,存儲索引用戶當前處理的項目並返回,以便作業completes和控件返回到sidekiq。
  4. 請注意,如果作業在短暫超時(默認20秒)後仍在運行,作業將被終止。
  5. 然後作業再次簡單地運行時,啓動你離開的地方最後一次(或0)

例子:

class UserCSVImportWorker 
     include Sidekiq::Worker 

     def perform(user_id) 
     user = User.find(user_id) 

     items = user.raw_csv_items.where(:index => {'$gte' => user.last_csv_index.to_i}) 
     items.each_with_index do |item, i| 
      if (i+1 % 100) == 0 && Sidekiq::Fetcher.done? 
      user.update(last_csv_index: item.index) 

      return 
      end 

      # Process the item as normal 
     end 
     end 
    end 

上面的類可以確保每100個項目,我們檢查fetcher沒有完成(如果shutdown已經開始的代理),並且結束作業的執行。然而,在執行結束之前,我們使用已處理的最後一個index來更新用戶,以便我們可以在下次停止時開始。

拋出捕捉是一種實現上述功能的方法,它可能是一個小清潔器(可能),但有點像使用Fibers,很好的概念,但很難把你的頭包裹起來。從技術上來說,投擲比起大多數人來說更像是轉身。

編輯

而且你不能讓呼叫Sidekiq::Fetcher.done?並記錄每行或處理,這樣,如果你的工人被殺害行的每個塊的last_csv_index無需機會記錄last_csv_index你仍然可以恢復「關閉」到你離開的地方。

+0

嗨@nort,我還有其他問題給你。如何處理Sidekiq :: Fetcher.done?在另一個班級內循環?在執行過程中,我只有一次調用'ModelContainer.find(model_id).parse_data' –

+0

應該實際上仍然可以正常工作,雖然有點未封裝...我在那裏的邏輯我傾向於放在工人而不是模型保持模型的輕薄。 – nort

+0

參見:https://github.com/mperham/sidekiq/issues/1845 – nort

3

您正在嘗試解決冪等的概念,認爲處理的事情多次與潛在的不完整的週期不會導致問題的想法。前

  1. https://github.com/mperham/sidekiq/wiki/Best-Practices#2-make-your-jobs-idempotent-and-transactional

    可能採取的步驟拆分文件成幾部分,並處理每部分工作的部分。

  2. 提高租用門限,以便在作業可能已完全完成時(10分鐘)
  3. 在工作正在進行時不允許租用縮小(在開始時設置redis鍵並清除完成時)
  4. 跟蹤作業在進行處理時的進度,以及如果作業被殺死時停止離開的地方。
+0

謝謝@nort,我會試試你的方法 –