我看到很多SO關於MongoDB聚合的問題,但是,我還沒有找到一個完整的解決方案。通過映射的MongoDB獨特的價值聚合reduce
這裏是我的數據的例子:
{
"fruits" : {
"apple" : "red",
"orange" : "orange",
"plum" : "purple"
}
}
{
"fruits" : {
"apple" : "green",
"plum" : "purple"
}
}
{
"fruits" : {
"apple" : "red",
"orange" : "yellow",
"plum" : "purple"
}
}
現在,我的目標是確定每種顏色的每個水果的普及,所以像這將是輸出集合:
{
"_id" : "apple"
"values" : {
"red" : 2,
"green" : 1
}
}
{
"_id" : "orange"
"values" : {
"orange" : 1,
"yellow" : 1
}
}
{
"_id" : "plum"
"values" : {
"purple" : 3
}
}
我已經嘗試了各種M/R功能,最後他們要麼不工作,要麼以指數級長。在這個例子(水果)的背景下,我有大約1,000,000個總文件,大約有1000種不同的水果和100,000種顏色。我目前的工作M/R是這樣的:
map = function() {
if (!this.fruits) return;
for (var fruit in this.fruits) {
emit(fruit, {
val_array: [
{value: this.fruits[fruit], count: 1}
]
});
}
};
reduce = function(key, values) {
var collection = {
val_array: []
};
var found = false;
values.forEach(function(map_obj) {
map_obj.val_array.forEach(function(value_obj) {
found = false;
// if exists in collection, inc, else add
collection.val_array.forEach(function(coll_obj) {
if (coll_obj.value == value_obj.value) {
// the collection already has this object, increment it
coll_obj.count += value_obj.count;
found = true;
return;
}
});
if (!found) {
// the collection doesn't have this obj yet, push it
collection.val_array.push(value_obj);
}
});
});
return collection;
};
現在,這樣做的工作,併爲100個記錄,它只需一秒鐘左右,但時間非線性增加,所以100M記錄將採取很長時間,很。問題是我在collection
數組中使用reduce函數進行窮人子聚合,因此需要我迭代collection
和我的map函數中的值。現在我只需要弄清楚如何有效地做到這一點(即使它需要多次減少)。歡迎任何建議!
編輯缺少一個更好的地方發佈它,這是我的解決方案。
首先,我創建了一個名爲
mr.js
文件:在我的整個集合
map = function() {
if (!this.fruits) return;
var skip_fruits = {
'Watermelon':1,
'Grapefruit':1,
'Tomato':1 // yes, a tomato is a fruit
}
for (var fruit in this.fruits) {
if (skip_fruits[fruit]) continue;
var obj = {};
obj[this.fruits[fruit]] = 1;
emit(fruit, obj);
}
};
reduce = function(key, values) {
var out_values = {};
values.forEach(function(v) {
for(var k in v) { // iterate values
if (!out_values[k]) {
out_values[k] = v[k]; // init missing counter
} else {
out_values[k] += v[k];
}
}
});
return out_values;
};
var in_coll = "fruit_repo";
var out_coll = "fruit_agg_so";
var total_docs = db[in_coll].count();
var page_size = 100000;
var pages = Math.floor(total_docs/page_size);
print('Starting incremental MR job with '+pages+' pages');
db[out_coll].drop();
for (var i=0; i<pages; i++) {
var skip = page_size * i;
print("Calculating page limits for "+skip+" - "+(skip+page_size-1)+"...");
var start_date = db[in_coll].find({},{date:1}).sort({date:1}).skip(skip).limit(1)[0].date;
var end_date = db[in_coll].find({},{date:1}).sort({date:1}).skip(skip+page_size-1).limit(1)[0].date;
var mr_command = {
mapreduce: in_coll,
map: map,
reduce: reduce,
out: {reduce: out_coll},
sort: {date: 1},
query: {
date: {
$gte: start_date,
$lt: end_date
}
},
limit: (page_size - 1)
};
print("Running mapreduce for "+skip+" - "+(skip+page_size-1));
db[in_coll].runCommand(mr_command);
}
該文件迭代,逐步映射/減少100K文檔(由date
排序其中必須有索引!)的時間,並減少他們成爲一個單一的輸出集合。它的用法如下:mongo db_name mr.js
。
然後,幾個小時後,我收集了所有的信息。爲了弄清楚哪些水果有大多數顏色,我用這個從蒙戈外殼打印出前25名:
// Show number of number of possible values per key
var keys = [];
for (var c = db.fruit_agg_so.find(); c.hasNext();) {
var obj = c.next();
if (!obj.value) break;
var len=0;for(var l in obj.value){len++;}
keys.push({key: obj['_id'], value: len});
}
keys.sort(function(a, b){
if (a.value == b.value) return 0;
return (a.value > b.value)? -1: 1;
});
for (var i=0; i<20; i++) {
print(keys[i].key+':'+keys[i].value);
}
這個方法的很酷的事情是,因爲它是漸進的,我可以與輸出工作數據,而mapreduce正在運行。
哇,我真的在想那個,不是我!這確實做到了我想要的。我用100,1000和100,000條記錄對它進行了測試,每個集合的運行速度約爲20k/sec(在這些大小上顯然是線性的)。我現在正在運行完整的10M記錄,我可以看到,隨着映射數據的批量變大,減少它們需要相當長的時間('colors'對象必須增長):''secs_running「:488,」msg 「:」m/r:(1/3)排出383999/10752083 3%「'。 – SteveK
順便說一下,我不能使用'emit(fruit,{this.fruits [fruit]:1});'因爲密鑰是動態生成的,所以我用這個JS hack代替:'var obj = {}; obj [this.fruits [水果]] = 1;散發(水果,obj);'。 – SteveK
我會建議嘗試部分工作。也就是說,批量處理文件100k(或其他),然後在最後的工作中減少它。這可能很難實現,所以如果是一次性的,我不會打擾。 :) –