2017-01-04 62 views
1

您好所有的stackoverflow用戶,我目前面臨一個關於mongodb & nodejs的問題。我試圖將數據插入到mongodb中,並且我已準備好將所有數據插入到mongodb中。我用來插入的數據是一個csv類型的文件。添加/插入數據到MongoDB中

這是我的項目。我的項目中有一個客戶端和服務器。服務器將從CSV文件中提取數據,監聽客戶端,並將數據從陣列的內存空間中的CSV文件發送到客戶端。客戶端將連接到服務器,請求數據並將數據追加到mongodb中。

client.js以下代碼很可能會幫助我使用它的值將它附加到mongodb中。有人可以幫我嗎?

monitoredItem.on("changed",function(dataValue){ 
      console.log(" New Data Receive = ",dataValue.value.value); 
     }); 

不確定如何將數據添加到MongoDB的

這是我client.js

/*global require,console,setTimeout */ 
 
var opcua = require("node-opcua"); 
 
var async = require("async"); 
 
var fs = require("fs"); 
 
var csv = require("fast-csv"); 
 
var sleep = require("system-sleep"); 
 

 
var client = new opcua.OPCUAClient(); 
 
var endpointUrl = "opc.tcp://" + require("os").hostname() + ":4334/UA/MyLittleServer"; 
 

 

 
var the_session, the_subscription; 
 

 
async.series([ 
 

 
    // step 1 : connect to 
 
    function(callback) { 
 
     client.connect(endpointUrl,function (err) { 
 
      if(err) { 
 
       console.log(" cannot connect to endpoint :" , endpointUrl); 
 
      } else { 
 
       console.log("connected !"); 
 
\t \t console.log("Endpoint URL ", endpointUrl); 
 
      } 
 
      callback(err); 
 
     }); 
 
    }, 
 

 
    // step 2 : createSession 
 
    function(callback) { 
 
     client.createSession(function(err,session) { 
 
      if(!err) { 
 
       the_session = session; 
 
      } 
 
      callback(err); 
 
     }); 
 
    }, 
 

 
    // step 3 : browse 
 
    function(callback) { 
 
     the_session.browse("RootFolder", function(err,browse_result){ 
 
      if(!err) { 
 
       browse_result[0].references.forEach(function(reference) { 
 
        console.log(reference.browseName.toString()); 
 
       }); 
 
      } 
 
      callback(err); 
 
     }); 
 
    }, 
 

 
    // step 4 : read a variable with readVariableValue 
 
    //function(callback) { 
 
    // the_session.readVariableValue("ns=2000;s=TEST", function(err,dataValue) { 
 
    //  if (!err) { 
 
    //   console.log(" free mem % = " , dataValue.toString()); 
 
    //  } 
 
    //  callback(err); 
 
    // }); 
 
    //}, 
 
    
 
    // step 4' : read a variable with read 
 
    //function(callback) { 
 
    // var max_age = 0; 
 
    // var nodes_to_read = [ 
 
    //  { nodeId: "ns=2000;s=TEST", attributeId: opcua.AttributeIds.Value } 
 
    // ]; 
 
    // the_session.read(nodes_to_read, max_age, function(err,nodes_to_read,dataValues) { 
 
    //  if (!err) { 
 
    //   console.log(" free mem % = " , dataValues[0]); 
 
    //  } 
 
    //  callback(err); 
 
    // }); 
 
    //}, 
 

 

 
// \t function(callback){ 
 
// \t \t the_session.readVariableValue("ns=74;s=Dou", function(err,dataValue) { 
 
// \t \t \t if(!err){ 
 
// \t \t \t \t console.log("Test Success", dataValue.toString()); 
 
// \t \t \t } 
 
// \t \t \t callback(err); 
 
// \t \t \t }); 
 
// \t \t }, 
 
// 
 
// \t function(callback){ 
 
// \t \t the_session.readVariableValue("ns=74;s=Float", function(err,dataValue) { 
 
// \t \t \t if(!err){ 
 
// \t \t \t \t console.log("Test Success", dataValue.toString()); 
 
// \t \t \t } 
 
// \t \t \t callback(err); 
 
// \t \t \t }); 
 
// \t \t }, 
 
// 
 
// \t function(callback){ 
 
// \t \t the_session.readVariableValue("ns=74;s=String", function(err,dataValue) { 
 
// \t \t \t if(!err){ 
 
// \t \t \t \t console.log("Test Success", dataValue.toString()); 
 
// \t \t \t } 
 
// \t \t \t callback(err); 
 
// \t \t \t }); 
 
// \t \t }, 
 
\t 
 
// \t function(callback){ 
 
// \t \t the_session.readVariableValue("ns=1;s=CSV", function(err, dataValue) { 
 
// \t \t \t if(!err){ 
 
// \t \t \t \t console.log(dataValue.toString()); 
 
// \t \t \t \t sleep(5000); 
 
// \t \t \t } 
 
// \t \t \t callback(err); 
 
// \t \t \t }); 
 
// \t \t }, 
 

 
// \t function(callback){ 
 
// \t \t the_session.readVariableValue("ns=1;s=CSV", function(err, dataValue) { 
 
// \t \t \t if(!err){ 
 
// \t \t \t \t fs.createReadStream(dataValue.toString()) 
 
// \t \t \t \t console.log(dataValue.toString()); 
 
// \t \t \t \t sleep(5000); 
 
// \t \t \t \t \t .pipe(csv()) 
 
// \t \t \t \t \t .on('data', function(data){ 
 
// \t \t \t \t \t console.log(csv); 
 
// \t \t \t \t \t sleep(5000); 
 
// \t \t \t \t \t }) 
 
// \t \t \t \t \t .op('end', function(data){ 
 
// \t \t \t \t \t console.log("Read Finish") 
 
// \t \t \t \t \t }); 
 
// \t \t \t } 
 
// \t \t \t callback(err); 
 
// \t \t }); 
 
// \t }, 
 

 
\t 
 

 

 
    
 
    // step 5: install a subscription and install a monitored item for 10 seconds 
 
    function(callback) { 
 
     
 
     the_subscription=new opcua.ClientSubscription(the_session,{ 
 
      requestedPublishingInterval: 1000, 
 
      requestedLifetimeCount: 10, 
 
      requestedMaxKeepAliveCount: 2, 
 
      maxNotificationsPerPublish: 10, 
 
      publishingEnabled: true, 
 
      priority: 10 
 
     }); 
 
     
 
     the_subscription.on("started",function(){ 
 
      console.log("subscription started for 2 seconds - subscriptionId=",the_subscription.subscriptionId); 
 
     }).on("keepalive",function(){ 
 
      console.log("keepalive"); 
 
     }).on("terminated",function(){ 
 
      callback(); 
 
     }); 
 
     
 
     setTimeout(function(){ 
 
      the_subscription.terminate(); 
 
     },10000000); 
 
     
 
     // install monitored item 
 
    var monitoredItem = the_subscription.monitor({ 
 
      nodeId: opcua.resolveNodeId("ns=2000;s=TEST"), 
 
      attributeId: opcua.AttributeIds.Value 
 
     }, 
 
     { 
 
      samplingInterval: 100, 
 
      discardOldest: true, 
 
      queueSize: 10 
 
     }, 
 
     opcua.read_service.TimestampsToReturn.Both 
 
     ); 
 
     console.log("-------------------------------------"); 
 
     
 
    monitoredItem.on("changed",function(dataValue){ 
 
      console.log(" New Data Receive = ",dataValue.value.value); 
 
     }); 
 
    }, 
 

 
    // close session 
 
    function(callback) { 
 
     the_session.close(function(err){ 
 
      if(err) { 
 
       console.log("session closed failed ?"); 
 
      } 
 
      callback(); 
 
     }); 
 
    } 
 

 
], 
 
function(err) { 
 
    if (err) { 
 
     console.log(" failure ",err); 
 
    } else { 
 
     console.log("done!"); 
 
    } 
 
    client.disconnect(function(){}); 
 
}) ;

這是我server.js

/*global require,setInterval,console */ 
 
var opcua = require("node-opcua"); 
 
var fs = require("fs"); 
 
var csv = require("fast-csv"); 
 
var sleep = require("system-sleep"); 
 

 
var currentCount = 0; 
 
var array = ["No New Data"]; 
 

 
\t fs.createReadStream('1000data.csv') 
 
\t .pipe(csv()) 
 
\t .on('data', function(data){ 
 
\t \t array.push(JSON.stringify(data)); 
 
\t \t //sleep(5); 
 
\t }) 
 
\t .on('end', function(data) { 
 
\t }); 
 

 
\t 
 
// Let's create an instance of OPCUAServer 
 
var server = new opcua.OPCUAServer({ 
 
    port: 4334, // the port of the listening socket of the server 
 
    resourcePath: "UA/MyLittleServer", // this path will be added to the endpoint resource name 
 
    buildInfo : { \t \t \t //Information of the build, Retrieve the server information for the current instance of the db client 
 
     productName: "MySampleServer1", 
 
     buildNumber: "7658", 
 
     buildDate: new Date(2014,5,2) 
 
    } 
 
}); 
 

 

 

 
function post_initialize() { 
 
    console.log("initialized"); 
 
    function construct_my_address_space(server) { 
 
    
 
     var addressSpace = server.engine.addressSpace; 
 
     
 
     // declare a new object 
 
     var device = addressSpace.addObject({ 
 
      organizedBy: addressSpace.rootFolder.objects, 
 
      browseName: "MyDevice" 
 
     }); 
 
     
 
     // add some variables 
 
     // add a variable named MyVariable1 to the newly created folder "MyDevice" 
 
     var variable1 = 1; 
 
     
 
     // emulate variable1 changing every 500 ms 
 
     setInterval(function(){ variable1+=1; }, 1000); 
 
     
 
     addressSpace.addVariable({ 
 
      componentOf: device, 
 
      browseName: "MyVariable1", 
 
      dataType: "Double", 
 
      value: { 
 
       get: function() { 
 
        return new opcua.Variant({dataType: opcua.DataType.Double, value: variable1 }); 
 
       } 
 
      } 
 
     }); 
 

 

 

 

 
// \t var variableTest = "Test"; 
 
       
 
//  addressSpace.addVariable({ 
 
//   componentOf: device, 
 
//   nodeId: "ns=1;b=1020FFAA", 
 
//   browseName: "MyVariableTest", 
 
//   dataType: "String", 
 
//   value: { 
 
//    get: function() { 
 
//     return new opcua.Variant({dataType: opcua.DataType.String, value: variableTest }); 
 
//    }, 
 
//    set: function (variant) { 
 
//     variable2 = parseFloat(variant.value); 
 
//     return opcua.StatusCodes.Good; 
 
//    } 
 
//   }  
 
//  }); 
 
     
 
// \t server.jsonVar = server.engine.addVariable("MyDevice", { 
 
// \t \t browseName: "JSONObject", 
 
// \t \t dataType: "String", 
 
// \t \t value: { 
 
// \t \t \t get:function(){ 
 
// \t \t \t \t return new opcua.Variant({ 
 
// \t \t \t \t \t dataType: opcua.DataType.String, 
 
// \t \t \t \t \t value: get_json_string() 
 
// \t \t \t \t }); 
 
// \t \t \t } 
 
// \t \t } 
 
// \t }); 
 

 
// \t var varTestDou = addressSpace.addVariable({ 
 
// \t \t componentOf: device, 
 
// \t \t nodeId: "ns=74;s=Dou", 
 
// \t \t browseName : "VarD", 
 
// \t \t dataType: "Double", 
 
// \t \t value: new opcua.Variant({dataType: opcua.DataType.Double, value: [10.0,292.31,412.345,185,3453.245]}) 
 

 
// \t }); 
 

 
// \t var varTestFloat = addressSpace.addVariable({ 
 
// \t \t componentOf: device, 
 
// \t \t nodeId: "ns=74;s=Float", 
 
// \t \t browseName : "VarF", 
 
// \t \t dataType: "Float", 
 
// \t \t value: new opcua.Variant({dataType: opcua.DataType.Float, value: [10.0,402.23,123,34,643,34]}) 
 

 
// \t }); 
 

 
// \t var varTestString = addressSpace.addVariable({ 
 
// \t \t componentOf: device, 
 
// \t \t nodeId: "ns=74;s=String", 
 
// \t \t browseName : "VarT", 
 
// \t \t dataType: "String", 
 
// \t \t value: new opcua.Variant({dataType: opcua.DataType.String, value: "001,41,54,87,23,12/3/2016,8:39am"}) 
 

 
// \t }); 
 

 
\t var csvFile = addressSpace.addVariable({ 
 
\t \t componentOf: device, 
 
\t \t nodeId: "ns=1;s=CSV", 
 
\t \t browseName: "csvData", 
 
\t \t dataType: "String", 
 
\t \t value: new opcua.Variant({dataType: opcua.DataType.String, value: array[variable1]}) 
 
\t }); 
 
\t 
 

 

 

 
// \t fs.createReadStream('1000data.csv') 
 
// \t .pipe(csv()) 
 
// \t .on('data', function(data){ 
 
// \t \t console.log("Data uploaded"); 
 
// \t }) 
 
// \t .on('end', function(date) { 
 
// \t \t console.log("Read Finish"); 
 
// \t }); 
 

 

 

 
\t 
 
\t \t //look at this if you want to send data from client to server 
 
     // add a variable named MyVariable2 to the newly created folder "MyDevice" 
 
     var variable2 = 10.0; 
 
     
 
     server.engine.addressSpace.addVariable({ 
 
      
 
      componentOf: device, 
 
      
 
      nodeId: "ns=2000;b=1020FFAA", // some opaque NodeId in namespace 4 
 
      
 
      browseName: "MyVariable2", 
 
      
 
      dataType: "Double",  
 
      
 
      value: { 
 
       get: function() { 
 
        return new opcua.Variant({dataType: opcua.DataType.Double, value: variable2 }); 
 
       }, 
 
       set: function (variant) { 
 
        variable2 = parseFloat(variant.value); 
 
        return opcua.StatusCodes.Good; 
 
       } 
 
      } 
 
     }); 
 

 

 
     var os = require("os"); 
 
     /** 
 
     * returns the percentage of free memory on the running machine 
 
     * @return {double} 
 
     */ 
 
     function available_memory() { 
 
\t \t \t if (currentCount < array.length-1){ 
 
\t \t \t \t currentCount += 1 ; 
 
\t \t \t \t console.log(array[currentCount]); 
 
\t \t \t \t return currentCount; 
 
\t \t \t } else { 
 
\t \t \t \t console.log(array[0]); 
 
\t \t \t \t return 0; 
 
\t \t \t } 
 

 
     } 
 
     server.engine.addressSpace.addVariable({ 
 
      
 
      componentOf: device, 
 
      
 
      nodeId: "ns=2000;s=TEST", // a string nodeID 
 
      browseName: "FreeMemory", 
 
      dataType: "String",  
 
      value: { 
 
       get: function() {return new opcua.Variant({dataType: opcua.DataType.String, value: array[available_memory()] });} 
 
      } 
 
     }); 
 
    } 
 
    construct_my_address_space(server); 
 
    server.start(function() { 
 
     console.log("Server is now listening ... (press CTRL+C to stop)"); 
 
     console.log("port ", server.endpoints[0].port); 
 
     var endpointUrl = server.endpoints[0].endpointDescriptions()[0].endpointUrl; 
 
     console.log(" the primary server endpoint url is ", endpointUrl); 
 
    }); 
 
} 
 
server.initialize(post_initialize);

這是我1000data.csv

Machine Unit,Air Temperature,Water Temperature,Heat Temperature,Room Temperature,Date,Time 
 
1,61,54,87,24,12/3/2016,8:39AM 
 
2,41,57,92,23,29/9/2016,3:51PM 
 
3,39,53,89,25,22/12/2016,5:30PM

回答

0

你可以添加一個功能,它增加了監測,並得到一個被稱爲當數據變化回調。其中您將數據添加到mongoDB。

功能看起來是這樣的:

... 
 

 
function addMonitoring(callback) { 
 
    const monitoredItem = the_subscription.monitor({ 
 
     nodeId: opcua.resolveNodeId("ns=2000;s=TEST"), 
 
     attributeId: opcua.AttributeIds.Value 
 
    }, { 
 
     samplingInterval: 100, 
 
     discardOldest: true, 
 
     queueSize: 10 
 
    }, 
 
    opcua.read_service.TimestampsToReturn.Both 
 
); 
 

 
    monitoredItem.on("changed", dataValue => { 
 
    if (dataValue === undefined || dataValue.value === null) { 
 
     callback(`The subscription for the Opc Servernode failed.`); 
 
    } else { 
 
     callback(null, dataValue.value.value); 
 
    } 
 
    }); 
 
} 
 

 
...

現在在第5步中您可以添加類似:

import mongooseModel from 'mongooseModel'; 
 

 
... 
 

 
setTimeout(function() { 
 
    the_subscription.terminate(); 
 
}, 10000000); 
 

 
addMonitoring((error, value) => { 
 
    if (!error) { 
 
    mongooseModel.findByIdAndUpdate(1 /*your id*/ , { 
 
     $push: { 
 
     "data": value 
 
     } 
 
    }) 
 
    } else { 
 
    // handle error 
 
    } 
 
});

ŧ他的模型可能是這樣的:

... 
 

 
const schema = mongoose.Schema({ 
 
    data: { 
 
    type: [Number], 
 
    required: true 
 
    } 
 
    timestamp: { 
 
    type: Date, 
 
    default: Date.now, 
 
    }, 
 
}); 
 

 
export default mongoose.model("TestData", schema); 
 

 
...

的數據庫的模式取決於您要保存

數據