2017-06-08 26 views
0

背景

mongodb映射到rails/mongoid類的兩個文檔。這兩個班級分別是TaskSubscription。出於性能原因,Subscription.current_task存儲Task::CurrentTask,其中包含Task的屬性的子集,但匹配預訂的實際當前任務是給定Task#subscription_id的最高Task#pos的那個。如何使用Mongoid進行重新縮減以聚合來自兩個不同字段的數據?

問題

一些不一致的某些屬性之間出現了從Subscription.current_task和應匹配Task,特別是state領域。

目標

清單Subscription S的不匹配的最後一個任務此訂閱的當前所有任務。

解決方案旨在

首先,地圖/減少Task得到最後一個爲每個預訂並存儲到一個臨時的集合。第三,利用Subscription上的這個臨時集合進行再次縮減,以便爲每個訂閱獲得包含實際最後任務和當前嵌入子集副本的對象。第三,爲實際和複製任務不匹配的元素創建報告。

難度遇到

雖然已讀取的雜項和official mongodbmangoid documentation,和其它的例子。像MongoDB Map Re-Reduce and joins – performance tuningMongoDB, Mongoid, MapReduce and Embedded Documents.這樣的博客,我仍然無法找到適用於rereduce步驟的工作解決方案。

的非功能性溶液到目前爲止寫道:

# map/reduce of tasks to get the last one of each subscripton 
last_task_map = %Q{ 
    function() { 
    var key = this.subscription_id; 
    var value = { 
     task: { 
      pos: this.pos, 
      task_id: this._id, 
      state: this.state 
     }, 
     current_task: null 
    }; 
    emit(key, value); 
    } 
} 
last_task_reduce = %Q{ 
    function(key, tasks) { 
    var last_task = tasks[0]; 
    for (var i=1; i < tasks.length; i++) { 
     if(tasks[i].pos > last_task.pos) { 
     last_task = tasks[i]; 
     } 
    } 

    var value = { 
     task: {pos: last_task.pos, task_id: last_task.task_id, state: last_task.state}, 
     current_task: null 
    }; 
    return value; 
    } 
} 

# map/reduce of `current_task`s to merged with previous results 
subscription_map = %Q{ 
    function() { 
    if(!this.current_task) { 
     return; 
    } 
    var key = this._id; 
    var value = { 
     task: null, 
     current_task: { 
     pos: this.current_task.pos, 
     task_id: this.current_task.task_id, 
     state: this.current_task.state, 
     source: 'current_task', 
     } 
    }; 
    emit(key, value); 
    }; 
} 

reduce = %Q{ 
    function(key, tasks) { 
    if(tasks[0].current_task == nill) { 
     return {task: tasks[0].task, current_task: tasks[1].current_task}; 
    } 
    return {task: tasks[1].task, current_task: tasks[0].current_task}; 
    } 
} 


buffer = 'current_task_consistency' 
# temporary collection seems unremoved when serially calling the script with 
# `load` in a `rails c` prompt, so we drop it to avoid unwanted glitch merge 
Mongoid.default_client[buffer].drop 
t = Task.map_reduce(last_task_map, last_task_reduce).out(replace: buffer) 
s = Subscription.map_reduce(subscription_map, reduce).out(reduce: buffer) 
t.each{ |e| puts e } # ok: `{"_id"=>BSON::ObjectId('592dd603e138236671587b04'), "value"=>{"task"=>{"pos"=>0.0, "task_id"=>BSON::ObjectId('592dd604e138236671587b0f'), "state"=>40.0}, "current_task"=>nil}}` 
puts t.counts # ok: {"input"=>83900, "emit"=>83900, "reduce"=>36115, "output"=>28625} 
s.each{ |e| puts e } # ko: {"_id"=>BSON::ObjectId('592dd603e138236671587b04'), "value"=>{"task"=>nil, "current_task"=>{"pos"=>0.0, "task_id"=>BSON::ObjectId('592dd604e138236671587b0f'), "state"=>40.0, "source"=>"current_task"}}} 
puts s.counts # ko: {"input"=>28632, "emit"=>28624, "reduce"=>0, "output"=>28624} 

爲第二地圖的預期結果/減少是current_task_consistencysubscription_map結果的合併應全部通過內減少,當根據執行無到counts,實際上s元素的輸出顯示沒有task鍵被賦值爲current_task_consistency值。

相關暴露問題

  • 什麼實施的湖泊問題?
  • 據我所知,此 解決方案確實提供merge函數,這些函數是不相關的,並提供與相應的函數返回一致的輸出 。 對out參數的工作原理以及如何管理 重新降低輸入/輸出應該如何管理有什麼誤解?

補充說明

第三步,生成報告,意在被實現爲應用的第二地圖上的finalize功能/減少。但是也許第三張地圖/縮小圖可能是更好的方法,或者不是。總的來說,實施可能至少從性能角度來看結構糟糕,並且也歡迎反饋。

+1

如果我理解正確,那麼首先從'task'獲取數據,該數據集合在'subscription_id'上,從「pos」返回「最大」值的匹配數據。那麼你想要完成訂閱並將一些數據結合到第一個輸出的結果中?來自「預訂」的任何數據將返回給那些不會從「任務」輸出產生的密鑰?因爲哪裏不會有新的鍵,那麼你可能應該使用聚合和'$ lookup',而不是做兩階段mapReduce。 –

+0

看來你很瞭解這個目標,並且我沒有意識到'$ lookup'的可能性。我會調查一下,一旦我完成了map/reduce/finalize解決方案,對我來說,發現如何在Mongo中查詢這樣的事情也是一個很好的練習。謝謝。 – psychoslave

+0

請參閱[聚合文檔](https://docs.mongodb.com/manual/aggregation/)和[$ lookup documentation](https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/# pipe._S_lookup) – psychoslave

回答

0

提出的解決方案的第一個問題是簡單的ruby/js語法混合,而不是null。不幸的是,劇本失敗了,至少在我正在運行的撬臺中load current_task_consistency.rb

下面是一個工作解決方案,其中包含兩個map/reduce和一個關於生成的臨時集合的查詢。

# map/reduce of tasks to get the last one of each subscripton 
last_task_map = %Q{ 
    function() { 
    var key = this.subscription_id; 
    var value = { 
     task: { 
      pos: this.pos, 
      task_id: this._id, 
      state: this.state 
     }, 
     current_task: null 
    }; 
    emit(key, value); 
    } 
} 
last_task_reduce = %Q{ 
    function(key, tasks) { 
    var last_task = tasks[0]; 
    for (var i=1; i < tasks.length; i++) { 
     if(tasks[i].pos > last_task.pos) { 
     last_task = tasks[i]; 
     } 
    } 

    var value = { 
     task: {pos: last_task.pos, task_id: last_task.task_id, state: last_task.state}, 
     current_task: null 
    }; 
    return value; 
    } 
} 

# map/reduce of `current_task`s merged side by side with the corresponding 
# subscription last task 
subscription_map = %Q{ 
    function() { 
    if(!this.current_task) { 
     return; 
    } 
    var key = this._id; 
    var value = { 
     task: null, 
     current_task: { 
     pos: this.current_task.pos, 
     task_id: this.current_task.task_id, 
     state: this.current_task.state, 
     } 
    }; 
    emit(key, value); 
    }; 
} 

subscription_reduce = %Q{ 
    function(key, tasks) { 
    if(tasks[0].current_task == null) { 
     return {task: tasks[0].task, current_task: tasks[1].current_task}; 
    } 
    return {task: tasks[1].task, current_task: tasks[0].current_task}; 
    } 
} 

buffer = 'current_task_consistency' 
# temporary collection seems unremoved when serially calling the script with 
# `load` in a `rails c` prompt, so we drop it to avoid unwanted merge glitch 
Mongoid.default_client[buffer].drop 

Task.map_reduce(last_task_map, last_task_reduce). 
    out(replace: buffer). 
    execute 

Subscription. 
    map_reduce(subscription_map, subscription_reduce). 
    out(reduce: buffer). 
    execute 

ascertain_inconsistency = %Q{ 
    this.value.current_task == null || 
    this.value.current_task.state != this.value.task.state 
} 

inconsistencies = Mongoid.default_client['current_task_consistency']. 
    find({ "$where": ascertain_inconsistency }) 
相關問題