2015-05-06 76 views
1

我使用Node.js + RxJS + MongoDB作爲socket.io服務器。經過一定數量的請求後,我的連接池到數據庫變得非常大。所以文件描述符永遠不會被釋放並且服務器關閉。節點本機mongodb驅動程序連接池問題

對於DB查詢我用下面的代碼:

/* @flow */ 

var { Observable } = require('rx'); 
var client = require('mongodb'); 
var { assign } = require('lodash'); 
var __DEV__ = process.env.NODE_ENV !== 'production'; 
var URL = 'my database url'; 

class QueryBuilder { 
    _db$: Observable; 
    _selectors: Object; 

    constructor(db$: Observable, selectors?: Object) { 
    this._db$ = db$; 

    if (!selectors) { 
     this._selectors = assign({ 
     collection: null, 
     query: {}, 
     opts: {}, 
     sort: {}, 
     offset: 0, 
     limit: 0 
     }, selectors); 
    } 
    else { 
     this._selectors = selectors; 
    } 
    } 

    static connect(url: string): QueryBuilder { 
    var connect = Observable.fromNodeCallback(client.connect, client); 
    var db$ = connect(db.url); 

    db$.subscribe(
     _ => { 
     if (__DEV__) { 
      console.log('Connected to database on', url); 
     } 
     }, 
     err => console.log('Database connection error:', err.message, err.stack) 
    ); 

    return new QueryBuilder(db$); 
    } 

    close(): any { 
    this._db$.dispose(); 

    return QueryBuilder; 
    } 

    collection(name: string): QueryBuilder { 
    var ss = assign({}, this._selectors, { collection: name }); 
    return new QueryBuilder(this._db$, ss); 
    } 

    select(query?: Object = {}, opts?: Object = {}): QueryBuilder { 
    var ss = assign({}, this._selectors, { query, opts }); 
    return new QueryBuilder(this._db$, ss); 
    } 

    selectOne(query?: Object = {}, opts?: Object = {}): QueryBuilder { 
    var ss = assign({}, this._selectors, { 
     query: query, 
     opts: opts, 
     limit: 1 
    }); 
    return new QueryBuilder(this._db$, ss); 
    } 

    sort(sort: Object): QueryBuilder { 
    var ss = assign({}, this._selectors, { sort }); 
    return new QueryBuilder(this._db$, ss); 
    } 

    skip(offset: number): QueryBuilder { 
    var ss = assign({}, this._selectors, { offset }); 
    return new QueryBuilder(this._db$, ss); 
    } 

    limit(limit: number = 0): QueryBuilder { 
    var ss = assign({}, this._selectors, { limit }); 
    return new QueryBuilder(this._db$, ss); 
    } 

    exec(): Observable { 
    var ss = this._selectors; 
    var db$ = this._db$ 

    if (!ss.collection) return Observable.throw('You have to provide collection name.'); 
    if (!db$) return Observable.throw('No db connection found.'); 

    var o = db$.flatMapLatest(db => { 
     var c = db.collection(ss.collection); 
     var cursor = c.find(ss.query, ss.opts).sort(ss.sort).skip(ss.offset).limit(ss.limit); 
     var obs = Observable.fromNodeCallback(cursor.toArray, cursor); 
     return obs(); 
    }); 

    return ss.limit === 1 ? o.map(res => res[0]) : o; 
    } 
} 

module.exports = QueryBuilder.connect(URL); 

如果這裏的問題?

回答

1

您正在泄漏connect方法中的訂閱,並且您正在處置observable而不是訂閱。

class QueryBuilder { 
    _db$: Observable; 
    _sub: Disposable; 
    _selectors: Object; 

    constructor(db$: Observable, selectors?: Object) { 
    this._db$ = db$; 

    this._sub = db$.subscribe(
     _ => { 
     if (__DEV__) { 
      console.log('Connected to database on', url); 
     } 
     }, 
     err => console.log('Database connection error:', err.message, err.stack) 
    ); 

    if (!selectors) { 
     this._selectors = assign({ 
     collection: null, 
     query: {}, 
     opts: {}, 
     sort: {}, 
     offset: 0, 
     limit: 0 
     }, selectors); 
    } 
    else { 
     this._selectors = selectors; 
    } 
    } 

    static connect(url: string): QueryBuilder { 
    var connect = Observable.fromNodeCallback(client.connect, client); 
    var db$ = connect(db.url); 

    return new QueryBuilder(db$); 
    } 

    close(): any { 
    this._sub.dispose(); 

    return QueryBuilder; 
    } 
} 

(未測試)

相關問題