2014-01-08 104 views
2

我有一個非常大的mongodb項目集合,並且我無法更改模式。簡化版本如下所示:

{event: { address: {ip: "1.1.1.1", port: 80}}} 
{event: { address: {ip: "1.1.1.2", port: 80}}} 
{event: { address: [{ip: "1.1.1.1", port: 80}, {ip: "1.1.1.1", port: 443}]}} 
{event: { address: [{ip: "1.1.1.1", port: 8080}, {ip: "1.1.1.2", port: 443}]}} 

每個事件可能有一個或多個地址。每個地址都有「ip」和「port」。因此,在多個地址的事件中可能會重複「ip」。

我想要做的就是計算每個IP地址的事件數量,並找到頂部的IP地址。對於上面的例子中,首選的結果是:

[ { "ip" : "1.1.1.1", "count" : 3 }, 
    { "ip" : "1.1.1.2", "count" : 2 } ] 

想到的查詢是這樣的:

db.collection.aggregate({$project: {ip: "$event.address.ip"}}, {$group: {_id: "$ip", count: {$sum: 1}}}, {$sort: {count: -1}}, {$limit: 5}) 

但結果是:

{ 
"result" : [ 
    { "_id" : ["1.1.1.1", "1.1.1.2"], "count" : 1 }, 
    { "_id" : ["1.1.1.1", "1.1.1.1"], "count" : 1 }, 
    { "_id" : "1.1.1.2", "count" : 1 }, 
    { "_id" : "1.1.1.1", "count" : 1 } ], 
"ok" : 1 
} 

我不能用$放鬆因爲每個IP地址應該只計算一次每個事件,但某些事件具有相同的IP重複。另外,$ unwind通常不起作用,因爲「address」並不總是一個數組。有些事件只有一個地址不是數組,而$ unwind會爲它們拋出異常。

我嘗試了不同的聚合運算符,例如$ group中的$ addToSet,但都無濟於事。

該集合非常大,我不能首先提取我的應用程序中的所有IP地址,然後計算每個事件。

是否可以使用map/reduce來完成。你會建議什麼?

回答

5

雖然這可以通過MapReduce完成,但Aggregation框架會更快。您需要在計劃中添加兩個步驟 - 1)您需要「格式化」格式,以便地址始終是一個數組,2)然後您需要$展開該數組,然後按_id,ip組排除重複項和然後按ip分組以獲得您需要的計數。

規範化數組和非數組非常棘手,但可以在$unwind之前和之後使用兩個投影來完成。

var p1 = { "$project" : { 
     "array" : { 
      "$cond" : [ 
       { 
        "$eq" : [ 
         "$address.0", 
         [ ] 
        ] 
       }, 
       "$address", 
       [ 
        null 
       ] 
      ] 
     }, 
     "notarray" : { 
      "$cond" : [ 
       { 
        "$ne" : [ 
         "$address.0", 
         [ ] 
        ] 
       }, 
       "$address", 
       [ 
        null 
       ] 
      ] 
     }, 
     "isArray" : { 
      "$eq" : [ 
       "$address.0.ip", 
       [ ] 
      ] 
     } 
    } 
}; 
var u = { "$unwind" : "$array" }; 
var p2 = { "$project" : { 
     "address" : { 
      "$cond" : [ 
       "$isArray", 
       "$array", 
       "$notarray" 
      ] 
     } 
    } 
}; 

相比之下,兩個$group階段是簡單的:

var g1 = { "$group" : { "_id" : { "_id" : "$_id", "ip" : "$address.ip" } } }; 
var g2 = { "$group" : { "_id" : "$_id.ip", "count" : { "$sum" : 1 } } }; 

這裏是我的樣本數據:

> db.coll.find() 
{ "_id" : ObjectId("52cd0badba17f3b7ed212575"), "address" : { "ip" : "1.1.1.1" } } 
{ "_id" : ObjectId("52cd0bc4ba17f3b7ed212576"), "address" : [ { "ip" : "1.1.1.1" }, { "ip" : "1.1.1.1" } ] } 
{ "_id" : ObjectId("52cd0bc9ba17f3b7ed212577"), "address" : [ { "ip" : "1.1.1.1" }, { "ip" : "1.1.1.2" } ] } 

這裏是聚集和其輸出:

> db.coll.aggregate(p1, u, p2, g1, g2) 
{ "_id" : "1.1.1.1", "count" : 3 } 
{ "_id" : "1.1.1.2", "count" : 1 } 
0

啊,你有兩個問題,一個是模式設計不好,另外兩個是不規範的,因此同一個字段至少沒有相同的結構。你在一塊岩石和一個堅硬的地方之間。

如果所有的地址字段都陣列那麼這將很容易地工作,而是你不能現在有條件$unwind,unfortauntely如果試圖$unwind什麼,但一個數組,這樣你可以放鬆的地址,那麼你會得到一個錯誤:

If you specify a target field for $unwind that is not an array, db.collection.aggregate() generates an error.

http://docs.mongodb.org/manual/reference/operator/aggregation/unwind/

所以呀,你這樣的困在這裏。

這可以用MR完成,但分組會很痛苦。就我個人而言,我會做的是運行一個增量MR,以標準格式寫出這個模式,以便可以使用聚合框架。

+0

它可以用聚合fra完成mework。看到我的答案。 –

+0

@AsyaKamsky這是一個瘋狂的條件數量,你確定如果數據集增長,它仍然會更快? – Sammaye

+0

當然它會更快 - mapreduce也將不得不隨着數據集的增長處理更多的文檔/字段。第一個項目有兩個條件(兩個新領域各有一個),第二個項目有一個條件。這並不「瘋狂」 –