我試圖將現有的API轉換爲與RxJS一起工作...對於節點來說相當新,對RxJs也很新,所以請耐心等待。試圖讓我自己的RxJs可觀察
我有一個現有的API(getNextMessage),它可以阻塞(異步),或者當某些東西變得可用時,通過節點樣式(err,val)回調返回一個新的項目或錯誤。
所以它看起來是這樣的:
getNextMessage(nodeStyleCompletionCallback);
當服務器響應時,您可以將getNextMessage視爲一個http請求,即將來完成,但您需要在收到消息後再次調用getNextMessage,以不斷從服務器獲取新項目。因此,爲了使它成爲一個可觀察的集合,我必須讓RxJs繼續調用我的getNextMessage函數,直到訂閱者處置完畢();否則,我們不得不調用getNextMessage函數。
基本上,我試圖創建自己的RxJs可觀察集合。
的問題是:
- 我不知道如何使subscriber.dispose()殺死async.forever
- 我可能不應該擺在首位 使用async.forever
- 我不確定我應該爲每條消息甚至得到'完成' - 不應該是在序列的末尾
- 我想最終刪除使用fromNodeCallback的需要,以獲得第一個RxJS級可觀察
- 顯然我有點困惑。
想得到一點幫助,謝謝!
這裏是我現有的代碼:
var Rx = require('rx');
var port = require('../lib/port');
var async = require('async');
function observableReceive(portName)
{
var observerCallback;
var listenPort = new port(portName);
var disposed = false;
var asyncReceive = function(asyncCallback)
{
listenPort.getNextMessage(
function(error, json)
{
observerCallback(error, json);
if (!disposed)
setImmediate(asyncCallback);
}
);
}
return function(outerCallback)
{
observerCallback = outerCallback;
async.forever(asyncReceive);
}
}
var receive = Rx.Observable.fromNodeCallback(observableReceive('rxtest'));
var source = receive();
var subscription = source.forEach(
function (json)
{
console.log('receive completed: ' + JSON.stringify(json));
},
function (error) {
console.log("receive failed: " + error.toString());
},
function() {
console.log('Completed');
subscription.dispose();
}
);
非常感謝您! 我自己也想出了它的一半,儘管我沒有得到儘可能多的異步。 現在我可以建立自己的第一類可觀察物,這是一個美麗的世界。順便說一句,這是一個很好的例子 - 應該放在RxJs文檔的某處。 再次感謝。 – user3291110