0

如何將MongoDB聚合的結果輸出到集合中,而無需從另一個聚合輸出中替換集合?從多個集合中彙總

我需要只有$出的數據:「tempCollection」,因爲我有500mln文件,並獲得pipeline stage limit

var q = [ 
    {$match: query}, 
    {$group: {_id: '$hash'}}, 
    {$out: 'tempCollection'} 
]; 

async.parallel([ 
    function(callback) { 
    firstCollection.aggregate(q, callback); 
    }, 
    function(callback) { 
    secondCollection.aggregate(q, callback); 
    }, 

    ... 

], function() { 

    // I want to get all from tempCollection (with pagination) here 

}); 
+1

錯誤的建立你的問題。 ** $ ** **總是**取代。你真的想在這裏做什麼? 「添加」兩個結果在一個集合?或者,根據某些共同的價值「合併」「積累」來自兩個結果的其他價值?還要具體說明這是基本的節點驅動程序還是像貓鼬或僧侶之類的其他東西。 –

+0

我使用貓鼬。我需要以任何方式獲取所有不同的散列值(合併或寫入一個集合等)。 –

+0

挑一個。 「合併」 - 意味着您有一個共同的「鍵」或組成「鍵」的字段,並且您打算在找到相同的鍵時「增加」其他值。 「連接」 - 意味着您只需要將兩組結果結尾在一個集合中。注意在後者中,「關鍵」確實需要不同,或者人爲製造。 –

回答

1

這裏的底線是,$out選項只有永遠「取代」輸出到目標集合上。所以要做其他事情,你必須通過客戶端連接而不是僅僅輸出到服務器。

這裏的貓鼬最好的選擇是直接進入底層驅動程序,並獲得驅動程序支持的node stream interface

繁瑣的例子,但它顯示了結構的基本途徑:

var async = require('async'), 
    mongoose = require('mongoose'), 
    Schema = mongoose.Schema; 

mongoose.connect('mongodb://localhost/aggtest'); 

var testSchema = new Schema({},{ "_id": false, strict: false }); 


var ModelA = mongoose.model('ModelA', testSchema), 
    ModelB = mongoose.model('ModelB', testSchema), 
    ModelC = mongoose.model('ModelC', testSchema); 

function processCursor(cursor,target,callback) { 

    cursor.on("end",callback); 
    cursor.on("error",callback); 

    cursor.on("data",function(data) { 
    cursor.pause(); 
    target.update(
     { "_id": data._id }, 
     { "$setOnInsert": { "_id": data._id } }, 
     { "upsert": true }, 
     function(err) { 
     if (err) callback(err); 
     cursor.resume(); 
     } 
    ); 
    }); 
} 

async.series(
    [ 
    // Clean data 
    function(callback) { 
     async.each([ModelA,ModelB,ModelC],function(model,callback) { 
     model.remove({},callback); 
     },callback); 
    }, 

    // Sample data 
    function(callback) { 
     async.each([ModelA,ModelB],function(model,callback) { 
     async.each([1,2,3],function(id,callback) { 
      model.create({ "_id": id },callback); 
     },callback); 
     },callback); 
    }, 

    // Run merge 
    function(callback) { 
     async.parallel(
     [ 
      function(callback) { 
      var cursor = ModelA.collection.aggregate(
       [ 
       { "$group": { "_id": "$_id" } } 
       ], 
       { "batchSize": 25 } 
      ); 

      processCursor(cursor,ModelC,callback) 
      }, 
      function(callback) { 

      var cursor = ModelB.collection.aggregate(
       [ 
       { "$group": { "_id": "$_id" } } 
       ], 
       { "batchSize": 25 } 
      ); 

      processCursor(cursor,ModelC,callback) 
      } 
     ], 
     callback 
    ); 
    }, 

    // Get merged 
    function(callback) { 
     ModelC.find({},function(err,results) { 
     console.log(results); 
     callback(err); 
     }); 
    } 
    ], 
    function(err) { 
    if (err) throw err; 
    mongoose.disconnect(); 
    } 
); 

Oustide這一點,那麼你將需要$out爲「獨立」的集合,然後用類似.update()合併它們在過程,但要保持它「服務器端」,那麼你需要使用.eval()

這並不好,但這是保持服務器運行的唯一方法。您還可以使用"Bulk"操作(再次通過相同的本地.collection接口)對此進行修改,以獲得更多的吞吐量。但是選項歸結爲「通讀客戶」或「評估」。