2016-11-11 59 views
0

我試圖啓動一個集羣,它將從谷歌雲存儲中流式傳輸文件(新行分隔的JSON),並在從MongoDB獲取數據後轉換每一行。在對行進行轉換之後,我想將它存儲在Google的bigquery中 - 每次10000行。所有這些工作都很好,但問題在於流處理文件的處理速度隨着時間的推移而顯着下降。Node.js可讀流隨着時間的推移而變慢,CPU使用率下降

我在一臺服務器上安裝了節點應用程序,在另一臺服務器上安裝了mongodb。這兩款8核心機器均配備30GB RAM。腳本執行時,最初應用程序服務器和mongodb服務器的CPU使用率約爲70%-75%。 30分鐘後,CPU使用率降至10%,最後降至1%。應用程序不會生成任何例外我可以看到應用程序日誌,並發現它處理完一些文件,並拿起新文件進行處理。一個執行可以在比下午3點晚上幾乎晚上5:20 PM的時候觀察到。

CPU Usage Disk Usage

var cluster = require('cluster'), 
    os = require('os'), 
    numCPUs = os.cpus().length, 
    async = require('async'), 
    fs = require('fs'), 
    google = require('googleapis'), 
    bigqueryV2 = google.bigquery('v2'), 
    gcs = require('@google-cloud/storage')({ 
     projectId: 'someproject', 
     keyFilename: __dirname + '/auth.json' 
    }), 
    dataset = bigquery.dataset('somedataset'), 
    bucket = gcs.bucket('somebucket.appspot.com'), 
    JSONStream = require('JSONStream'), 
    Transform = require('stream').Transform, 
    MongoClient = require('mongodb').MongoClient, 
    mongoUrl = 'mongodb://localhost:27017/bigquery', 
    mDb, 
    groupA, 
    groupB; 

var rows = [], 
    rowsLen = 0; 

function transformer() { 

    var t = new Transform({ objectMode: true }); 

    t._transform = function(row, encoding, cb) { 
     // Get some information from mongodb and attach it to the row 
     if (row) { 
      groupA.findOne({ 
       'geometry': { $geoIntersects: { $geometry: { type: 'Point', coordinates: [row.lon, row.lat] } } } 
      }, { 
       fields: { 'properties.OA_SA': 1, _id: 0 } 
      }, function(err, a) { 
       if (err) return cb(); 
       groupB.findOne({ 
        'geometry': { $geoIntersects: { $geometry: { type: 'Point', coordinates: [row.lon, row.lat] } } } 
       }, { 
        fields: { 'properties.WZ11CD': 1, _id: 0 } 
       }, function(err, b) { 
        if (err) return cb(); 
        row.groupA = a ? a.properties.OA_SA : null; 
        row.groupB = b ? b.properties.WZ11CD : null; 

        // cache processed rows in memory 
        rows[rowsLen++] = { json: row }; 

        if (rowsLen >= 10000) { 
         // batch insert rows in bigquery table 
         // and free memory 
         log('inserting 10000') 
         insertRowsAsStream(rows.splice(0, 10000)); 
         rowsLen = rows.length; 
        } 

        cb(); 
       }); 
      }); 
     } else { 
      cb(); 
     } 
    }; 

    return t; 
} 

var log = function(str) { 
    console.log(str); 
} 

function insertRowsAsStream(rows, callback) { 
    bigqueryV2.tabledata.insertAll({ 
     "projectId": 'someproject', 
     "datasetId": 'somedataset', 
     "tableId": 'sometable', 
     "resource": { 
      "kind": "bigquery#tableDataInsertAllRequest", 
      "rows": rows 
     } 
    }, function(err, res) { 
     if (res && res.insertErrors && res.insertErrors.length) { 
      console.log(res.insertErrors[0].errors) 
      err = err || new Error(JSON.stringify(res.insertErrors)); 
     } 
    }); 
} 


function startStream(fileName, cb) { 
    // stream a file from Google cloud storage 
    var file = bucket.file(fileName), 
     called = false; 

    log(`Processing file ${fileName}`); 

    file.createReadStream() 
     .on('data', noop) 
     .on('end', function() { 
      if (!called) { 
       called = true; 
       cb(); 
      } 
     }) 
     .pipe(JSONStream.parse()) 
     .pipe(transformer()) 
     .on('finish', function() { 
      log('transformation ended'); 
      if (!called) { 
       called = true; 
       cb(); 
      } 
     }); 
} 

function processFiles(files, cpuIdentifier) { 
    if (files.length == 0) return; 
    var fn = []; 

    for (var i = 0; i < files.length; i++) { 
     fn.push(function(cb) { 
      startStream(files.pop(), cb); 
     }); 
    } 

    // process 3 files in parallel 
    async.parallelLimit(fn, 3, function() { 
     log(`child process ${cpuIdentifier} completed the task`); 
     fs.appendFile(__dirname + '/complete_count.txt', '1'); 
    }); 
} 

if (cluster.isMaster) { 
    for (var ii = 0; ii < numCPUs; ii++) { 
     cluster.fork(); 
    } 
} else { 
    MongoClient.connect(mongoUrl, function(err, db) { 
     if (err) throw (err); 
     mDb = db; 
     groupA = mDb.collection('groupageo'); 
     groupB = mDb.collection('groupbgeo'); 
     processFiles(files, process.pid); 
     // `files` is an array of file names 
     // each file is in newline json delimited format 
     // ["1478854974993/000000000000.json","1478854974993/000000000001.json","1478854974993/000000000002.json","1478854974993/000000000003.json","1478854974993/000000000004.json","1478854974993/000000000005.json"] 
    }); 
} 
+0

內存和硬盤使用情況如何? –

+0

我猜測RAM很好,因爲我沒有得到內存分配/ GC錯誤。爲什麼HD會成爲這個解決方案中的一個問題? –

+0

沒有收到內存分配錯誤並不意味着它沒有問題。過度使用RAM會觸發交換內存使用,反過來會使用硬盤。 –

回答

0

好吧,我已經找到了罪魁禍首! Google API Node.js客戶端庫使用實現Streams 0.8的名爲「stream-events」的模塊。根據消費者使用數據的能力,Streams 0.8不會控制它發出「數據」事件的速率。流量控制功能在Streams 1.0中引入。所以這基本上意味着可讀流將以無法處理的速度在MongoDB上拋出數據。

解決方案: 我使用'request'模塊而不是Google的客戶端庫。我向請求模塊提供了一個簽名的URL,該請求模塊又將結果作爲一個流輸入到我的變壓器中。

帶走: 請務必檢查您用於他們正在使用的流版本的模塊。

相關問題