2017-05-03 16 views
0

運行下面的代碼subscribeOn排除repeatWhen不起作用subscribeOn

PublishSubject<Void> repeatSubject = PublishSubject.create(); 

Observable. 
    <Integer>create(subscriber -> { 
     System.out.println("1"); 
     subscriber.onNext(1); 
     subscriber.onCompleted(); 
    }). 
    //subscribeOn(Schedulers.computation()). 
    repeatWhen(h -> h.flatMap(nothing -> repeatSubject)). 
    subscribe(); 

repeatSubject.onNext(null); 

版畫,一如預期,兩個 '1'

但是,因爲只有我取消註釋subscribeOn,所以只打印了one'1'。必須是repeatWhen不會重新訂閱可觀察。爲什麼?

P.S.感謝好解釋yosriz。所以現在問題不是'爲什麼',而是'如何'。我怎麼能從主線程重複觀察,安排在另一個線程?

P.P.S.以下修改仍然打印一個'1'。

BehaviorSubject<Void> repeatSubject = BehaviorSubject.create(); 
    Observable. 
      <Integer>create(subscriber -> { 
       System.out.println("1"); 
       subscriber.onNext(1); 
       subscriber.onCompleted(); 
      }) 
      .subscribeOn(Schedulers.computation()) 
      .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { 
       @Override 
       public Observable<?> call(Observable<? extends Void> observable) { 
        return observable.zipWith(repeatSubject, new Func2<Void, Void, Object>() { // called on main thread 
         @Override 
         public Object call(Void aVoid, Void aVoid2) { 
          return new Object(); // never called ! 
         } 
        }); 
       } 
      }) 
      .subscribe(); 
    repeatSubject.onNext(null); // called on main thread 

回答

2

這是因爲在第一種情況下,當你沒有指定Scheduler,在訂閱立即發生在這個線程阻斷的方式,所以,當訂閱了repeatSubjectrepeatWhen,它不排放呢。所以你打電話repeatSubject.onNext(null);flatMap發生。
這意味着首先flatMap訂閱了repeatSubject,然後只有onNext(null)被調用,repeatWhen重新訂閱源Observable

當您指定的調度,所有Observable序列得到安排在一個單獨的線程,所以repeatSubject.onNext(null);威力訂閱了repeatSubject的flatMap之前發生,(你沒有任何保證的兩個操作發生在兩個不同的線程中),所以它錯過了通知,因此重複不起作用。

Observable. 
      <Integer>create(subscriber -> { 
       System.out.println("1"); 
       subscriber.onNext(1); 
       subscriber.onCompleted(); 
      }). 
      subscribeOn(Schedulers.computation()). 
      repeatWhen(h -> h.flatMap(nothing -> { 
       System.out.println("repeatWhen flatMap"); 
       return repeatSubject; 
      })). 
      subscribe(); 

    System.out.println("repeatSubject.onNext"); 
    repeatSubject.onNext(null); 

編輯:

如何重複:

可以以一起同步既完成發射使用zip

您可以通過添加日誌觀察行爲和你的Subject.onNext()排放,以這種方式,每個onNext()將重複序列中的一個時間:

 Observable. 
      <Integer>create(subscriber -> { 
       System.out.println("1"); 
       subscriber.onNext(1); 
       subscriber.onCompleted(); 
      }) 
      .subscribeOn(Schedulers.computation()) 
      .repeatWhen(completeNotfications -> 
        completeNotfications.zipWith(repeatSubject, (o, aVoid) -> new Object())) 
      .subscribe(); 
+0

感謝解釋!但是,我怎麼能在主線程上重複另一個線程上的可觀察性? – Alexey

+0

什麼是不完全的工作不工作:( – Alexey

+0

?我測試了它 – yosriz