1

我在更改用於我使用Mongo DB構建的時間序列數據庫的模式時遇到了一些問題。目前,我有記錄,如下面所示:彙總將文檔鍵展開爲新文檔

{ 
    "_id" : 20, 
    "name" : "Bob, 
    "location" : "London", 
    "01/01/1993" : { 
     "height" : "110cm", 
     "weight" : "60kg", 
    }, 
    "02/01/1993" : { 
     "height" : "112cm", 
     "weight" : "61kg", 
    } 

}

我想使用聚合框架來創建一些記錄每個「人」,每一個「時間價值」的子文檔中原始記錄:

{ 
    "_id" : 20, 
    "name" : "Bob, 
    "date" : "01/01/1993" 
    "location" : "London", 
    "height" : "110cm", 
    "weight" : "60kg", 
}, 

{ 
    "_id" : 20, 
    "name" : "Bob, 
    "date" : "02/01/1993" 
    "location" : "London", 
    "height" : "112cm", 
    "weight" : "61kg", 
} 

加入了大量的時間序列值的每個記錄時,新方案應該是更有效,我不應該碰到一個最大原稿尺寸錯誤!

任何關於如何使用MongoDB聚合管道來做到這一點的幫助將不勝感激!

回答

1

雖然聚合框架的現代版本中有功能可以讓你做這種事,但里程可能會有所不同,以至於它實際上是否是最好的解決方案。

實質上,您可以創建一個條目數組,其中包含文檔鍵「不包含」其他頂級鍵,然後將包含在文檔中。該數組然後可以用$unwind處理,整個結果重新塑造成新的文件:

db.getCollection('input').aggregate([ 
    { "$project": { 
    "name": 1, 
    "location": 1, 
    "data": { 
     "$filter": { 
     "input": { "$objectToArray": "$$ROOT" }, 
     "as": "d", 
     "cond": { 
      "$not": { "$in": [ "$$d.k", ["_id","name","location"] ] }  
     } 
     } 
    } 
    }}, 
    { "$unwind": "$data" }, 
    { "$replaceRoot": { 
    "newRoot": { 
     "$arrayToObject": { 
     "$concatArrays": [ 
      [{ "k": "id", "v": "$_id" }, 
      { "k": "name", "v": "$name" }, 
      { "k": "location", "v": "$location" }, 
      { "k": "date", "v": "$data.k" }], 
      { "$objectToArray": "$data.v" } 
     ] 
     } 
    } 
    }}, 
    { "$out": "output" } 
]) 
數組元素內的初始 $project生產

或交替完成所有的整形:

db.getCollection('input').aggregate([ 
    { "$project": { 
    "_id": 0, 
    "data": { 
     "$map": { 
     "input": { 
      "$filter": { 
      "input": { "$objectToArray": "$$ROOT" }, 
      "as": "d", 
      "cond": { 
       "$not": { "$in": [ "$$d.k", ["_id", "name", "location"] ] }  
      } 
      } 
     }, 
     "as": "d", 
     "in": { 
      "$arrayToObject": { 
      "$concatArrays": [ 
       { "$filter": { 
       "input": { "$objectToArray": "$$ROOT" }, 
       "as": "r", 
       "cond": { "$in": [ "$$r.k", ["_id", "name", "location"] ] } 
       }}, 
       [{ "k": "date", "v": "$$d.k" }], 
       { "$objectToArray": "$$d.v" } 
      ] 
      } 
     } 
     } 
    } 
    }}, 
    { "$unwind": "$data" }, 
    { "$replaceRoot": { "newRoot": "$data" } }, 
    { "$out": "output" } 
]) 

使您在使用$objectToArray$filter,以便從實際上包含每個日期的數據點的關鍵字創建一個數組。

$unwind後,我們基本上以構建newRoot$replaceRoot上的「陣列格式」一組命名鍵的應用$arrayToObject,然後根據使用$out每個數據關鍵一個新文件寫入到新的集合。

雖然這可能只會讓你的一部分,你真的應該將"date"數據更改爲BSON日期。它需要更少的存儲空間,並且更容易查詢。

var updates = []; 
db.getCollection('output').find().forEach(d => { 
    updates.push({ 
    "updateOne": { 
     "filter": { "_id": d._id }, 
     "update": { 
     "$set": { 
      "date": new Date(
      Date.UTC.apply(null, 
       d.date.split('/') 
       .reverse().map((e,i) => (i == 1) ? parseInt(e)-1: parseInt(e)) 
      ) 
     ) 
     } 
     } 
    } 
    }); 
    if (updates.length >= 500) { 
    db.getCollection('output').bulkWrite(updates); 
    updates = []; 
    } 
}) 

if (updates.length != 0) { 
    db.getCollection('output').bulkWrite(updates); 
    updates = []; 
} 

當然,如果你的MongoDB服務器缺少這些聚合功能,那麼你最好還是先通過迭代循環擺在首位寫輸出到一個新的集合:

var output = []; 

db.getCollection('input').find().forEach(d => { 
    output = [ 
    ...output, 
    ...Object.keys(d) 
     .filter(k => ['_id','name','location'].indexOf(k) === -1) 
     .map(k => Object.assign(
     { 
      id: d._id, 
      name: d.name, 
      location: d.location, 
      date: new Date(
      Date.UTC.apply(null, 
       k.split('/') 
       .reverse().map((e,i) => (i == 1) ? parseInt(e)-1: parseInt(e)) 
      ) 
     ) 
     }, 
     d[k] 
    )) 
    ]; 

    if (output.length >= 500) { 
    db.getCollection('output').insertMany(output); 
    output = [];  
    } 
}) 

if (output.length != 0) { 
    db.getCollection('output').insertMany(output); 
    output = []; 
} 

在任這些情況下,我們希望將Date.UTC應用於基於現有「字符串」日期的反向字符串元素,並獲得比可投射到BSON日期的值。

聚集框架本身不允許的類型鑄造所以該部分的溶液(它是一個必要的部分)是實際循環和更新,但是使用形成至少使其高效循環和更新。

無論哪種情況給你相同的最終輸出:

/* 1 */ 
{ 
    "_id" : ObjectId("599275b1e38f41729f1d64fe"), 
    "id" : 20.0, 
    "name" : "Bob", 
    "location" : "London", 
    "date" : ISODate("1993-01-01T00:00:00.000Z"), 
    "height" : "110cm", 
    "weight" : "60kg" 
} 

/* 2 */ 
{ 
    "_id" : ObjectId("599275b1e38f41729f1d64ff"), 
    "id" : 20.0, 
    "name" : "Bob", 
    "location" : "London", 
    "date" : ISODate("1993-01-02T00:00:00.000Z"), 
    "height" : "112cm", 
    "weight" : "61kg" 
} 
+0

偉大的答案。在第一部分(聚合管道)中,您使用以下內容:「$ not」:{「$ in」:[「$$ d.k」,[「_id」,「name」,「location」]]}。 $$ d.k指的是什麼?我知道這是用來'跳過'對象到數組命令的_id,名稱和位置字段,但不確定確切的方法! – Ctrp

+0

我似乎無法在此語句前面的代碼中找到對'k'的引用 – Ctrp

+0

@Ctrp您需要查看答案中提供的文檔鏈接,特別是在'$ objectToArray'處。所做的是將輸入「對象」的每個「關鍵」和「值」,併產生一個「數組」,其中的條目如「[{」k「:」01/01/1993「,」v「:{」height 「:」110cm「,」weight「:」60kg「}},]'。這就是'k'和'​​v'在整個列表中的含義。 –