2014-08-27 55 views
1

我已經刪除的樣板去點rxjs zip不是懶惰?

// a.js

// My observables from stream and event 
this.a = Rx.Node.fromStream(this.aStream()); 
this.itemSource = Rx.Observable.fromEvent(ee, 'addItem'); 

// Zip 'em 
this.itemcombo = Rx.Observable.zip(this.a, this.itemSource, function (s1, s2) { 
    return {item: s2, a: s1.toString()}; 
}); 

// Streams the lowercase alphabet 
rb.prototype.aStream = function aStream() { 
var rs = Readable(); 

var c = 97; 
rs._read = function() { 
    rs.push(String.fromCharCode(c++)); 
    console.log('Hit!'); 
    if (c > 'z'.charCodeAt(0)) { 
     rs.push(null); 
    } 
}; 

return rs; 
}; 

// b.js(需要上面導出模塊)

rb.enqueue('a'); // The method simply does an ee.emit('addItem', ...) in the module to trigger the itemSource observable 

我期望看到:

{item: 'a', a: 'a'}印在控制檯

發生了什麼事:

Hit!{item: 'a', a: 'a'}之前印刷24次。這意味着zip已從aStream中取得所有值,將它們緩存起來,然後執行它應該執行的操作。

我該如何獲得相同的功能zip優惠但懶惰?我的目標是使用無限流/可見性,並將其與有限(異步)壓縮在一起。通過可運行

編輯

查看/編輯:RX Zip test編輯基於答案更新2代碼 - >沒有輸出了。

+0

請加回樣板和簡化的例子。 – 2014-08-27 12:50:58

+0

http://www.yoda.arachsys.com/csharp/complete.html – 2014-08-27 12:51:23

+0

@DaveSexton請參閱:http://pastebin.com/mnc82KuV和http://pastebin.com/8HxURWYc複製/粘貼/運行版本- 謝謝!我不認爲這個例子可以更簡化。這是2個流和zip功能。我已經包含了我用作參考的那個流,它可能已被排除,忽略該部分是安全的,但認爲它可能是有用的。 – rollingBalls 2014-08-27 13:04:04

回答

1

zip確實很懶。它只訂閱ab,並且在產生新值時進行工作。

您的問題是fromStream只要zip訂閱了它,它就會同步發送所有值。發生這種情況是因爲您的自定義Readable經常說「有更多數據可用!」

使你的Readable異步,你會得到所需的行爲。

嘗試這樣的事情(未經測試)

var rs = Readable(); 
var subscription = null; 
rs._read = function() { 
    if (!subscription) { 
     // produce the values once per second 
     subscription = Rx.Observable 
      .generateWithRelativeTime(
       97, // start value 
       function (c) { return c > 'z'.charCodeAt(0); }, // end condition 
       function (c) { return c + 1; }, // step function 
       function (c) { return String.fromCharCode(c); }, // result selector 
       function() { return 1000; }) // 1000ms between values 
      .subscribe(
       function (s) { 
        rs.push(s); 
        console.log("Hit!"); 
       }, 
       function (error) { rs.push(null); }, 
       function() { rs.push(null); }); 
    } 
}; 
+0

謝謝你的回答。我更新了可運行的小提琴來使用你的代碼,它只是輸出「完成」,現在沒有其他的事情發生。另外,即使我們通過在內部修改這個特定的流來解決這個問題,我的問題是,是否有辦法採取任意流並使其不會溢出zip(以便我可以在任何項目中使用該模式)? – rollingBalls 2014-08-27 14:52:01

+0

我接受這個答案。在瞭解了更多關於Reactive Programming的知識之後,我想我需要重構我的方法。 – rollingBalls 2014-08-30 12:54:36