2014-02-10 28 views
5

我試圖將現有的API轉換爲與RxJS一起工作...對於節點來說相當新,對RxJs也很新,所以請耐心等待。試圖讓我自己的RxJs可觀察

我有一個現有的API(getNextMessage),它可以阻塞(異步),或者當某些東西變得可用時,通過節點樣式(err,val)回調返回一個新的項目或錯誤。

所以它看起來是這樣的:

getNextMessage(nodeStyleCompletionCallback);

當服務器響應時,您可以將getNextMessage視爲一個http請求,即將來完成,但您需要在收到消息後再次調用getNextMessage,以不斷從服務器獲取新項目。因此,爲了使它成爲一個可觀察的集合,我必須讓RxJs繼續調用我的getNextMessage函數,直到訂閱者處置完畢();否則,我們不得不調用getNextMessage函數。

基本上,我試圖創建自己的RxJs可觀察集合。

的問題是:

  1. 我不知道如何使subscriber.dispose()殺死async.forever
  2. 我可能不應該擺在首位
  3. 使用async.forever
  4. 我不確定我應該爲每條消息甚至得到'完成' - 不應該是在序列的末尾
  5. 我想最終刪除使用fromNodeCallback的需要,以獲得第一個RxJS級可觀察
  6. 顯然我有點困惑。

想得到一點幫助,謝謝!

這裏是我現有的代碼:

var Rx = require('rx'); 
var port = require('../lib/port'); 
var async = require('async'); 

function observableReceive(portName) 
{ 
    var observerCallback; 
    var listenPort = new port(portName); 
    var disposed = false; 

    var asyncReceive = function(asyncCallback) 
    { 
     listenPort.getNextMessage(
      function(error, json) 
      { 
       observerCallback(error, json); 

       if (!disposed) 
        setImmediate(asyncCallback); 
      } 
     ); 
    } 

    return function(outerCallback) 
    { 
     observerCallback = outerCallback; 
     async.forever(asyncReceive); 
    } 
} 

var receive = Rx.Observable.fromNodeCallback(observableReceive('rxtest')); 
var source = receive(); 

var subscription = source.forEach(
    function (json) 
    { 
     console.log('receive completed: ' + JSON.stringify(json)); 
    }, 
    function (error) { 
     console.log("receive failed: " + error.toString()); 
    }, 
    function() { 
     console.log('Completed'); 
     subscription.dispose(); 
    } 
); 

回答

14

因此,這裏可能是我會做什麼。

var Rx = require('Rx'); 

// This is just for kicks. You have your own getNextMessage to use. ;) 
var getNextMessage = (function(){ 

    var i = 1; 

    return function (callback) { 
    setTimeout(function() { 
     if (i > 10) { 
     callback("lawdy lawd it's ova' ten, ya'll."); 
     } else { 
     callback(undefined, i++); 
     } 
    }, 5); 
    }; 

}()); 

// This just makes an observable version of getNextMessage. 
var nextMessageAsObservable = Rx.Observable.create(function (o) { 
    getNextMessage(function (err, val) { 
    if (err) { 
     o.onError(err); 
    } else { 
     o.onNext(val); 
     o.onCompleted(); 
    } 
    }); 
}); 

// This repeats the call to getNextMessage as many times (11) as you want. 
// "take" will cancel the subscription after receiving 11 items. 
nextMessageAsObservable 
    .repeat() 
    .take(11) 
    .subscribe(
    function (x) { console.log('next', x); }, 
    function (err) { console.log('error', err); }, 
    function() { console.log('done');  } 
); 
+2

非常感謝您! 我自己也想出了它的一半,儘管我沒有得到儘可能多的異步。 現在我可以建立自己的第一類可觀察物,這是一個美麗的世界。順便說一句,這是一個很好的例子 - 應該放在RxJs文檔的某處。 再次感謝。 – user3291110

5

我意識到這是一歲多,但我認爲這是一個更好的解決辦法是使用遞歸調度代替:

Rx.Observable.forever = function(next, scheduler) { 
    scheduler = scheduler || Rx.Scheduler.default, 
    //Internally wrap the the callback into an observable 
    next = Rx.Observable.fromNodeCallback(next);  

    return Rx.Observable.create(function(observer) { 
    var disposable = new Rx.SingleAssignmentDisposable(), 
     hasState = false; 
    disposable.setDisposable(scheduler.scheduleRecursiveWithState(null, 
     function(state, self) { 
     hasState && observer.onNext(state); 
     hasState = false; 
     next().subscribe(function(x){ 
      hasState = true; 
      self(x); 
     }, observer.onError.bind(observer)); 

     })); 

    return disposable; 

    }); 

}; 

這裏的想法是,你可以安排新一旦前一個項目完成後,項目。你可以調用next()來調用傳入的方法,當它返回一個值時,你調度下一個調用項。

然後,您可以使用它,像這樣:

Rx.Observable.forever(getNextMessage) 
.take(11) 
.subscribe(function(message) { 
console.log(message); 
}); 

看到一個工作示例here