2016-10-17 29 views
1

我目前正試圖找出一種方法來暫停觀察。在github看到blesh的帖子後,我想我正走在正確的軌道上。但由於某種原因,我的觀察者的takeWhile()在我從我的pauser主題執行switchMap()後被忽略。使用mergeMap()與主題忽略takeWhile()在我正在合併

這兩端正確的:

export class CompositionService { 
    cursor = -1; 
    pauser = new Subject(); 
    interval; 
    init = (slides) => { 
     let waitUntil = 0; 
     return this.interval = Observable 
      .range(0, slides.length) 
      .mergeMap((i) => { 
       let next = Observable.of(i).delay(waitUntil); 
       waitUntil += !!slides[i]["duration"] ? slides[i]["duration"] : 0; 
       return next; 
      }) 
      .scan((cursor) => { 
       return this.cursor = cursor = slides[cursor + 1] ? cursor + 1 : -1; 
      }, this.cursor) 
      .map(cursor => slides[cursor]) 
      .takeWhile((slide) => { 
       return !!slide; 
      }); 

    }; 
    // these methods are not called for this sample 
    play =() => { 
     this.pauser.next(false); 
    }; 
    pause =() => { 
     this.pauser.next(true); 
    }; 
}; 

這樣叫時,此工作:

it("should subscribe to init", (done) => { 
    slides.forEach((slide, i) => { 
     if (slide.duration) { 
      slide.duration = slide.duration/100; 
     } 
    }); 
    composition.init(slides).subscribe(
     (slide) => { 
      console.log(slide); 
     }, 
     (err) => { 
      console.log("Error: " + err); 
     }, 
     () => { 
      done(); 
     }); 
}); 

雖然前面的例子可以作爲宣傳的間隔觀察永遠不會結束,當我加入一些「神奇」:

export class CompositionService2 { 
    cursor = -1; 
    pauser = new Subject(); 
    interval; 
    init = (slides) => { 
     let waitUntil = 0; 
     this.interval = Observable 
      .range(0, slides.length) 
      .mergeMap((i) => { 
       let next = Observable.of(i).delay(waitUntil); 
       waitUntil += !!slides[i]["duration"] ? slides[i]["duration"] : 0; 
       return next; 
      }) 
      .scan((cursor) => { 
       return this.cursor = cursor = slides[cursor + 1] ? cursor + 1 : -1; 
      }, this.cursor) 
      .map(cursor => slides[cursor]) 
      .takeWhile((slide) => { 
       return !!slide; 
      }); 
     return this.pauser 
      // leaving commented for clarity of the end game 
      // .switchMap(paused => paused ? Observable.never() : this.interval); 
      // however, not even a straight forward switchMap is yeilding the expected results 
      .switchMap(paused => this.interval);    
    }; 
    play =() => { 
     this.pauser.next(false); 
    }; 
    pause =() => { 
     this.pauser.next(true); 
    }; 
}; 

以這種方式調用:

it("should subscribe to init", (done) => { 
    slides.forEach((slide, i) => { 
     if (slide.duration) { 
      slide.duration = slide.duration/100; 
     } 
    }); 
    composition.init(slides).subscribe(
     (slide) => { 
      console.log(slide); 
     }, 
     (err) => { 
      console.log("Error: " + err); 
     }, 
     () => { 
      //I never get here!!!!! 
      done(); 
     }); 
    // kickstart my heart! 
    composition.play(); 
}); 

任何人有任何想法我在做什麼錯在這裏?

回答

1

您沒有完成外部流。在第一個版本中,當takeWhile完成流時,您正在完成。但是,一旦你嵌入switchMap的內部。你只能完成內層流,因爲外層流(一個Subject)從未完成。當這種情況變得平坦時,它對用戶來說就是一個永不終止的流。

如果你想讓它完成,你需要在某個時候終止流,例如:

composition.init(slides) 
    .take(3) 
    .subscribe(
    (slide) => { 
     console.log(slide); 
    }, 
    (err) => { 
     console.log("Error: " + err); 
    }, 
    () => { 
     //I never get here!!!!! 
     done(); 
    }); 

我不是超級確信的Rx其實是正確的工具,在這裏,因爲流是並非真的被設計爲「暫停」,因爲你實際上無法停止Observable繼續傳播。您可能已經注意到您在暫停之間存儲狀態時跳過的箍數,因此考慮使用生成器或其他庫(如IxJS)可能會有意義。但我離題了。