2016-10-04 37 views
1

我已經創建了測試Observables兩個函數傳遞數據,每個返回Observable:這樣一種更好的方式,從一個可觀察到另一個

foo() { 
    return new Observable(observer => { 
    let i = 0; 
    setInterval(() => { 
     if(i === 10) { 
     observer.complete(); 
     } else { 
     observer.next(i); 
     i++; 
     } 
    }, 1000); 
    // If I call observer.complete() here then it never completes 
    }); 
} 

bar(fooResult) { 
    return new Observable(observer => { 
    let j = 0; 
    setInterval(() => { 
     if(fooResult) { 
     observer.next('j -> '+j+' plus fooResult '+JSON.stringify(fooResult)); 
     observer.complete(); 
     } else { 
     observer.next(j); 
     j++; 
     } 
    }, 2000); 
    }); 
} 

,並利用其中的一部分:

let fooResult = []; 

    // Testing observables... 
    this.exampleProductService.foo().subscribe(
     (res) => { 
     console.log('foo next() -> '+res); 
     fooResult.push(res); 
     }, 
     (err) => { 
     console.error('foo error: '+JSON.stringify(err)); 
     }, 
    () => { 
     console.log('foo finished'); 
     this.exampleProductService.bar(fooResult).subscribe(
      (res) => { 
      console.log('bar next() -> '+res); 
      }, 
      (err) => { 
      console.error('bar error: '+JSON.stringify(err)); 
      }, 
     () => { 
      console.log('bar finished'); 
      } 
     ); 
     } 
    ); 

提出問題的人:

  1. 有沒有更好的方法來傳遞數據完成另一個函數的Observable,該函數也返回一個Observable?建立一個數組似乎麻煩,我不能做以下爲Observablecomplete callback部分不傳遞參數一樣progressUpdateonError

    (complete) => { this.exampleProductService.bar(complete).// rest of code } 
    
  2. 我試圖指派第一功能的結果一個變量,然後傳遞該變量,但如預期的那樣,我得到了一個Observable,而不是我想要的結果。

  3. 關於我如何進行上述操作,有什麼不正確的嗎?

感謝

附:這是一個Angular 2應用程序!

回答

3

我認爲你的功能有點過於複雜。例如,在工廠功能已經可用的情況下,不要使用構造函數,在這種情況下,請指定intervaltimer,儘管在這種情況下它會更加冗長。

其次,bar函數並沒有什麼實際意義,因爲您似乎正在等待您已經完成的某些已完成的任務(因爲您沒有訂閱它,直到完成由foo生成的Observable)。

我重構根據您陳述的目標等待一個Observable完成之前開始第二個,同時使用第二個第一個的結果。

// Factory function to emit 10 items, 1 every second 
function foo() { 
    return Observable.interval(1000) 
    .take(10); 
} 

// Lifts the passed in value into an Observable and stringfys it 
function bar(fooResult) { 
    return Rx.Observable.of(fooResult) 
    .map(res => JSON.stringify(fooResult)) 
} 

現在使用它們時,你會怎麼做,而不是:

上面我
foo() 
    // Log the side effects of foo 
    .do(
    x => console.log(`foo next() -> ${x}`), 
    err => console.error(`foo error: ${JSON.stringify(err)}`), 
    () => console.log('foo finished') 
) 
    // Collect the results from foo and only emit when foo completes 
    .reduce((total, diff) => [...total, diff], []) 

    // Pass the result from the reduce on to bar 
    .concatMap(fooResult => bar(fooResult)) 

    //Subscribe to the results of bar 
    .subscribe(
    res => console.log(`bar next() -> ${res}`), 
    err => console.error(`bar error: ${JSON.stringify(err)}`), 
    () => console.log('bar finished') 
); 

通知我也擺脫全球性的狀態,這是函數式編程的詛咒的。哪裏有可能你的狀態應該被本地化到流中。

看到這裏的工作示例:http://jsbin.com/pexayohoho/1/edit?js,console

+0

感謝feedbacl保羅。在這種情況下,您如何特別擺脫全球狀態? – Katana24

+0

另外 - 你能解釋一下這條線多一點嗎? .reduce((total,diff)=> [... total,diff],[])從文檔中,它充當Observable上的累加器,並在我完成時返回一個值。我明白,但可選的種子提供(差異)是什麼,以及[...總,差異]是什麼意思 - 我以前從來沒有見過這樣的:全局狀態 – Katana24

+1

,不再有一個全局數組叫做' fooResult'。 re:語法,'[... total,diff]'是'total.concat([diff])'的ES6短手,基本上它會創建一個新的數組副本加上新的項目以避免變化現有的項目。 'diff'是進入的新值,而不是可選的種子,它是作爲一個空數組'[]'提供的。這是否回答你的問題? – paulpdaniels

1

我不確定你想要達到什麼效果,但這裏有一個jsbin that試圖複製你的代碼。

幾點需要注意:

您FOO()是非常容易使用Observable.timer和。取()操作符來創建。您的酒吧()可以由另一個計時器與地圖和.takeWhile()運算符一起使用。

至於最後一次訂閱(完整部分),它只會打印'foo finished',但沒有別的,因爲它訂閱了一個已經終止的非緩衝序列。

+0

感謝您的反饋梅厄 - 我試圖實現是通過從一個可觀察到另一個輸出的方式。我喜歡你的例子中foo部分的簡潔性 – Katana24

相關問題