2014-01-10 57 views
3

我正在與Sidekiq一個高流量的測試,創建使用Mongoid作爲我在Rails應用4基於驅動MongoDB的對象重複的ID創建文檔。我看到的問題是,當PlayByPlay文檔應該具有唯一的game_id時,我看到使用相同的確切game_id創建了多個PlayByPlay對象。我也強制MongoDB的唯一約束,這仍然在發生。這是我的文檔,它是嵌入式文檔,以及我如何創建文檔的一瞥。問題是,這一切都發生在使用Sidekiq的線程環境中,我不確定是否有辦法解決它。我的寫作關注在mongoid.yml中設置爲1,看起來safe選項在主版本中被刪除,persist_in_safe_mode也被刪除。代碼如下 - 關於如何正確工作的任何建議,將不勝感激。這不是一個副本集,它是目前單個MongoDB服務器執行所有讀取/寫入請求。Mongoid 4(GitHub的主機)與

module MLB 
    class Play 
     include Mongoid::Document 
     include Mongoid::Timestamps 

     embedded_in :play_by_play 

     field :batter#, type: Hash 
     field :next_batter#, type: Hash 
     field :pitchers#, type: Array 
     field :pitches#, type: Array 
     field :fielders#, type: Array 
     field :narrative, type: String 
     field :seq_id, type: Integer 
     field :inning, type: Integer 
     field :outs 
     field :no_play 
     field :home_team_score 
     field :away_team_score 
    end 
    class PlayByPlay 
     include Mongoid::Document 
     include Mongoid::Timestamps 

     embeds_many :plays, cascade_callbacks: true 
     accepts_nested_attributes_for :plays 

     field :sport 
     field :datetime, type: DateTime 
     field :gamedate, type: DateTime 
     field :game_id 
     field :home_team_id 
     field :away_team_id 
     field :home_team_score 
     field :away_team_score 
     field :season_year 
     field :season_type 
     field :location 
     field :status 
     field :home_team_abbr 
     field :away_team_abbr 
     field :hp_umpire 
     field :fb_umpire 
     field :sb_umpire 
     field :tb_umpire 

     index({game_id: 1}) 
     index({away_team_id: 1}) 
     index({home_team_id: 1}) 
     index({season_type: 1}) 
     index({season_year: 1}) 

     index({"plays.seq_id" => 1}, {unique: true, drop_dups: true}) 
     #validates 'play.seq_id', uniqueness: true 
     validates :game_id, presence: true, uniqueness: true 
     validates :home_team_id, presence: true 
     validates :away_team_id, presence: true 
     validates :gamedate, presence: true 
     validates :datetime, presence: true 
     validates :season_type, presence: true 
     validates :season_year, presence: true 

     def self.parse!(entry) 
      @document = Nokogiri::XML(entry.data) 
      xslt = Nokogiri::XSLT(File.read("#{$XSLT_PATH}/mlb_pbp.xslt")) 
      transform = xslt.apply_to(@document) 
      json_document = JSON.parse(transform) 

      obj = find_or_create_by(game_id: json_document['game_id']) 
      obj.sport     = json_document['sport'] 
      obj.home_team_id  = json_document['home_team_id'] 
      obj.away_team_id  = json_document['away_team_id'] 
      obj.home_team_score = json_document['home_team_score'] 
      obj.away_team_score = json_document['away_team_score'] 
      obj.season_type   = json_document['season_type'] 
      obj.season_year   = json_document['season_year'] 
      obj.location    = json_document['location'] 
      obj.datetime    = DateTime.strptime(json_document['datetime'], "%m/%d/%y %H:%M:%S") 
      obj.gamedate    = DateTime.strptime(json_document['game_date'], "%m/%d/%Y %H:%M:%S %p") 
      obj.status     = json_document['status'] 
      obj.home_team_abbr = json_document['home_team_abbr'] 
      obj.away_team_abbr = json_document['away_team_abbr'] 
      obj.hp_umpire   = json_document['hp_umpire'] 
      obj.fb_umpire   = json_document['fb_umpire'] 
      obj.sb_umpire   = json_document['sb_umpire'] 
      obj.tb_umpire   = json_document['tb_umpire'] 
      p=obj.plays.build(seq_id: json_document['seq_id']) 
      p.batter   = json_document['batter'] 
      p.next_batter = json_document['next_batter'] if json_document['next_batter'].present? && json_document['next_batter'].keys.count >= 1 
      p.pitchers  = json_document['pitchers'] if json_document['pitchers'].present? && json_document['pitchers'].count >= 1 
      p.pitches  = json_document['pitches'] if json_document['pitches'].present? && json_document['pitches'].count >= 1 
      p.fielders  = json_document['fielders'] if json_document['fielders'].present? && json_document['fielders'].count >= 1 
      p.narrative  = json_document['narrative'] 
      p.seq_id   = json_document['seq_id'] 
      p.inning   = json_document['inning'] 
      p.outs    = json_document['outs'] 
      p.no_play  = json_document['no_play'] 
      p.home_team_score = json_document['home_team_score'] 
      p.away_team_score = json_document['away_team_score'] 

      obj.save 
     end 

    end 
end 

**注意**

這個問題消失,如果我限制sidekiq 1名工人,這顯然在現實世界裏,我從來沒有這樣做。

回答

2

這是因爲許多線程插入對象以相同的game_id。讓我解釋一下。

例如,你有兩個sidekiq線程T1和T2。它們並行運行。假設你有一個文件game_id 1並且它沒有被插入到數據庫中。

  1. T1進入parse方法,它認爲沒有文件在數據庫中game_id 1,它會創建一個文件與game_id 1並繼續填充其他數據,但它並沒有保存文檔。

  2. t2進入parse方法,它與game_id 1在數據庫中看不到文件,因爲此時t1沒有保存文件。 t2使用相同的game_id 1創建一個文檔。

  3. T1保存文檔

  4. T2保存文檔

結果是:你有相同的game_id 1兩個文件。

爲了防止這種情況,你可以使用一個互斥鎖序列化解析代碼的訪問權限。要知道如何使用互斥,請閱讀本:http://www.ruby-doc.org/core-2.0.0/Mutex.html

+0

另一個解決方案是一個給定的時間之後執行的任務。 'MyWorker.perform_in(5.seconds,1,2,3)'雖然它可能不被接受。 –

+0

Mutex幾乎把工作者的水平可伸縮性置之度外,理想情況下,你需要鎖定整個集羣的東西 – bbozo

+0

@bbozo在這種情況下,如果OP在許多不同的計算機上運行sidekiq線程,Mutex將不起作用。但是,如果OP只在一臺計算機上運行它,它就可以工作。如果OP在許多計算機上運行sidekiq線程,他可能需要另一種解決方案。 – vidaica

0

@ vidaica的答案是有幫助的。如果您從內存或數據庫中獲取並遞增ID,它可能會解決您的問題。

但是,您的game_id未在parse中生成,它正通過entry JSON對象傳遞到parse

如何/哪裏是你game_id產生?

+0

在這種情況下,我想這不是什麼重要的game_id來源。對於多個文檔/條目來說,具有相同的game_id但不同的播放可能是合法的情況。如果處理具有重複game_id的文檔,他仍然調用find_or_create_by,因此如果沒有@vidaica描述的併發性,他將永遠不會獲得具有相同game_id的兩條記錄。 – moonfly

+0

但是,如果兩個文檔不應該具有相同的game_id,那麼當異步調用worker的函數時,另一個問題是潛在的併發問題。但是有find_or_create_by()這個事實,並且仍然有重複game_ids的這個問題,清楚地表明至少存在來自vidaica答案的併發問題。可能這不是唯一的問題,但它是第一個要解決的問題。 – moonfly

0

天真的方法是到#parse最後一行改爲:但是

if where(game_id: obj.game_id).count == 0 
    # handle it here 
end 

注意,這仍有可能重複項:

obj.save if where(game_id: obj.game_id).count == 0 

或者,如果你的手以某種方式處理它。

+0

此解決方案無法消除插入相同game_id的文檔。由於obj.save需要時間並且在保存完成時,其他線程可能會到達檢查代碼,並且仍然看不到具有相同game_id的現有文檔,然後插入其他文檔具有相同的game_id – vidaica

+0

它也不能解決比賽兩個線程同時執行計數的情況 – bbozo

3

你已經有一個game_id索引,爲什麼不使它獨特?這樣數據庫不會允許重複輸入,即使mongoid不能正確地進行驗證(@vidaica的答案描述了mongoid如何驗證唯一性)。

嘗試添加唯一索引
index({"game_id" => 1}, {unique: true})
然後
rake db:mongoid:create_indexes

在蒙戈創建它們(請確保它是從蒙戈外殼創建)。

之後,MongoDB的不應該重複game_id堅持任何記錄,你必須做的紅寶石層來處理,你會收到來自mongodb的插件錯誤。

0

也許你應該做的,而不是插入的UPSERT:

obj = new(game_id: json_document['game_id']) 
obj.upsert 
1

不管你做什麼,你將要解決這個數據庫級,因爲你幾乎肯定會做執行唯一約束隨後的最糟糕的工作孟戈人做了什麼。

假設你會想要分割一天或考慮mongo,因爲它的水平可伸縮性功能(你正在做大量測試,所以我認爲這是你不想排除的設計),可能有沒有可靠的方法來做到這一點(見Ramifications of working with a mongodb clustersharding concepts):

假設我們被分片對電子郵件,並希望對用戶名唯一索引。這不可能通過集羣強制執行。

但是,如果你在分片或game_id你不考慮拆分在所有然後game_id設置一個唯一索引應避免重複記錄(見@xlembouras答案)。

但是,當該索引因競爭條件而被違反時,該答案可能無法防止異常,因此請務必解救該異常並執行更新,而不是在救援塊中創建(可能通過與@new_record (click 'Show source')一起播放,嘗試查找時間給你確切的代碼)。

UPDATE,短期快速的回答

begin 
    a = Album.new(name: 'foo', game_id: 3) 
    a.save 
rescue 
    a.id = id_of_the_object_with_same_id_already_in_db 
    a.instance_variable_set('@new_record', false) 
    a.save 
end