2012-05-06 126 views
4

我看到很多SO關於MongoDB聚合的問題,但是,我還沒有找到一個完整的解決方案。通過映射的MongoDB獨特的價值聚合reduce

這裏是我的數據的例子:

{ 
    "fruits" : { 
     "apple" : "red", 
     "orange" : "orange", 
     "plum" : "purple" 
    } 
} 
{ 
    "fruits" : { 
     "apple" : "green", 
     "plum" : "purple" 
    } 
} 
{ 
    "fruits" : { 
     "apple" : "red", 
     "orange" : "yellow", 
     "plum" : "purple" 
    } 
} 

現在,我的目標是確定每種顏色的每個水果的普及,所以像這將是輸出集合:

{ 
    "_id" : "apple" 
    "values" : { 
     "red" : 2, 
     "green" : 1 
    } 
} 
{ 
    "_id" : "orange" 
    "values" : { 
     "orange" : 1, 
     "yellow" : 1 
    } 
} 
{ 
    "_id" : "plum" 
    "values" : { 
     "purple" : 3 
    } 
} 

我已經嘗試了各種M/R功能,最後他們要麼不工作,要麼以指數級長。在這個例子(水果)的背景下,我有大約1,000,000個總文件,大約有1000種不同的水果和100,000種顏色。我目前的工作M/R是這樣的:

map = function() { 
    if (!this.fruits) return; 
    for (var fruit in this.fruits) { 
     emit(fruit, { 
      val_array: [ 
       {value: this.fruits[fruit], count: 1} 
      ] 
     }); 
    } 
}; 

reduce = function(key, values) { 
    var collection = { 
     val_array: [] 
    }; 
    var found = false; 
    values.forEach(function(map_obj) { 
     map_obj.val_array.forEach(function(value_obj) { 
      found = false; 
      // if exists in collection, inc, else add 
      collection.val_array.forEach(function(coll_obj) { 
       if (coll_obj.value == value_obj.value) { 
        // the collection already has this object, increment it 
        coll_obj.count += value_obj.count; 
        found = true; 
        return; 
       } 
      }); 
      if (!found) { 
       // the collection doesn't have this obj yet, push it 
       collection.val_array.push(value_obj); 
      } 
     }); 
    }); 
    return collection; 
}; 

現在,這樣做的工作,併爲100個記錄,它只需一秒鐘左右,但時間非線性增加,所以100M記錄將採取很長時間,很。問題是我在collection數組中使用reduce函數進行窮人子聚合,因此需要我迭代collection和我的map函數中的值。現在我只需要弄清楚如何有效地做到這一點(即使它需要多次減少)。歡迎任何建議!


編輯缺少一個更好的地方發佈它,這是我的解決方案。
首先,我創建了一個名爲 mr.js文件:在我的整個集合

map = function() { 
    if (!this.fruits) return; 
    var skip_fruits = { 
     'Watermelon':1, 
     'Grapefruit':1, 
     'Tomato':1 // yes, a tomato is a fruit 
    } 
    for (var fruit in this.fruits) { 
     if (skip_fruits[fruit]) continue; 
     var obj = {}; 
     obj[this.fruits[fruit]] = 1; 
     emit(fruit, obj); 
    } 
}; 

reduce = function(key, values) { 
    var out_values = {}; 
    values.forEach(function(v) { 
     for(var k in v) { // iterate values 
      if (!out_values[k]) { 
       out_values[k] = v[k]; // init missing counter 
      } else { 
       out_values[k] += v[k]; 
      } 
     } 
    }); 
    return out_values; 
}; 

var in_coll = "fruit_repo"; 
var out_coll = "fruit_agg_so"; 
var total_docs = db[in_coll].count(); 
var page_size = 100000; 
var pages = Math.floor(total_docs/page_size); 
print('Starting incremental MR job with '+pages+' pages'); 
db[out_coll].drop(); 
for (var i=0; i<pages; i++) { 
    var skip = page_size * i; 
    print("Calculating page limits for "+skip+" - "+(skip+page_size-1)+"..."); 
    var start_date = db[in_coll].find({},{date:1}).sort({date:1}).skip(skip).limit(1)[0].date; 
    var end_date = db[in_coll].find({},{date:1}).sort({date:1}).skip(skip+page_size-1).limit(1)[0].date; 
    var mr_command = { 
     mapreduce: in_coll, 
     map: map, 
     reduce: reduce, 
     out: {reduce: out_coll}, 
     sort: {date: 1}, 
     query: { 
      date: { 
       $gte: start_date, 
       $lt: end_date 
      } 
     }, 
     limit: (page_size - 1) 
    }; 
    print("Running mapreduce for "+skip+" - "+(skip+page_size-1)); 
    db[in_coll].runCommand(mr_command); 
} 

該文件迭代,逐步映射/減少100K文檔(由date排序其中必須有索引!)的時間,並減少他們成爲一個單一的輸出集合。它的用法如下:mongo db_name mr.js

然後,幾個小時後,我收集了所有的信息。爲了弄清楚哪些水果有大多數顏色,我用這個從蒙戈外殼打印出前25名:

// Show number of number of possible values per key 
var keys = []; 
for (var c = db.fruit_agg_so.find(); c.hasNext();) { 
    var obj = c.next(); 
    if (!obj.value) break; 
    var len=0;for(var l in obj.value){len++;} 
    keys.push({key: obj['_id'], value: len}); 
} 
keys.sort(function(a, b){ 
    if (a.value == b.value) return 0; 
    return (a.value > b.value)? -1: 1; 
}); 
for (var i=0; i<20; i++) { 
    print(keys[i].key+':'+keys[i].value); 
} 

這個方法的很酷的事情是,因爲它是漸進的,我可以與輸出工作數據,而mapreduce正在運行。

回答

8

看來你並不真的需要val_array。爲什麼不使用簡單的哈希?試試這個:

map = function() { 
    if (!this.fruits) return; 
    for (var fruit in this.fruits) { 
     emit(fruit, 
      {this.fruits[fruit]: 1}); 
    } 
}; 

reduce = function(key, values) { 
    var colors = {}; 

    values.forEach(function(v) { 
    for(var k in v) { // iterate colors 
     if(!colors[k]) // init missing counter 
     colors[k] = 0 

     color[k] += v[k]; 
    } 
    }); 

    return colors; 
} 
+0

哇,我真的在想那個,不是我!這確實做到了我想要的。我用100,1000和100,000條記錄對它進行了測試,每個集合的運行速度約爲20k/sec(在這些大小上顯然是線性的)。我現在正在運行完整的10M記錄,我可以看到,隨着映射數據的批量變大,減少它們需要相當長的時間('colors'對象必須增長):''secs_running「:488,」msg 「:」m/r:(1/3)排出383999/10752083 3%「'。 – SteveK

+0

順便說一下,我不能使用'emit(fruit,{this.fruits [fruit]:1});'因爲密鑰是動態生成的,所以我用這個JS hack代替:'var obj = {}; obj [this.fruits [水果]] = 1;散發(水果,obj);'。 – SteveK

+0

我會建議嘗試部分工作。也就是說,批量處理文件100k(或其他),然後在最後的工作中減少它。這可能很難實現,所以如果是一次性的,我不會打擾。 :) –

0

我很遺憾地告訴你這一點,但MongoDB的MapReduce框架是非常緩慢的,並可能會繼續如此的「相當長的一段」(我不會期望的改善是在他們的路線圖上)。

簡單地說,我的回答將是我不會做,與蒙戈-MapReduce的,而是注重與新的聚合框架的幫助下實現它: http://docs.mongodb.org/manual/reference/aggregation/

或跑在最前面的Hadoop: http://www.slideshare.net/spf13/mongodb-and-hadoop(很好,簡單的介紹)

我也有問題,使用實施MapReduce功能時,MongoDB緩慢,我的結論是,即使在做最簡單的任務,它甚至不會接近上述兩個解決方案涉及到性能。使用新的彙總框架,您可以輕鬆地在商品硬件上處理> 1M文檔/秒(甚至更多)。