2016-12-28 92 views
1

我想在從observable接收到第一個項目後取消訂閱。它似乎不起作用。我究竟做錯了什麼?RxJava:OnNext取消訂閱不起作用

public class ObservableAndSubscriber { 

    public static void main(final String[] args) { 
     final Observable<String> strObservable = Observable.create(s -> { 
      while (true) { 
       s.onNext("Hello World!!"); 
      } 
     }); 

     final Subscriber<String> strSubscriber = new Subscriber<String>() { 

      @Override 
      public void onCompleted() { 

      } 

      @Override 
      public void onError(final Throwable e) { 
       e.printStackTrace(); 

      } 

      @Override 
      public void onNext(final String t) { 
       System.out.println(t); 
       this.unsubscribe(); 

      } 
     }; 
     strObservable.subscribe(strSubscriber); 
    } 
} 

結果似乎在無限循環中打印「Hello World」。

+0

http://stackoverflow.com/q/30046124/697313 - 相關討論。 –

回答

0

在我讀Tomasz Nurkiewicz的書時,我做了這個例子。他後來有了一個類似的例子,並解釋了我的代碼出了什麼問題。

無限循環是邪惡的!因爲我有一個可觀察的創建lambda表達式,它在我的主線程的上下文中執行並無限期地訂閱塊。爲了讓可觀察的create lambda在不同的線程中執行,這意味着主線程永遠不會被阻塞,並且subscribe()完成。我問訂閱發生在一個IO線程上,並且做了這個竅門,這意味着無限循環發生在IO線程上,並且不會阻塞主線程。

strObservable.subscribeOn(Schedulers.io()).subscribe(strSubscriber); 
+0

我不認爲這將解決取消訂閱的問題。在'main()'末尾添加'Thread.sleep(1000)'以延遲程序終止,並看看它是如何發生的。 –

1

對於取消訂閱工作,您需要檢查循環中的訂閱狀態。否則,它將無限耗盡您的CPU。

處理無限序列最簡單的方法是利用庫提供的方法Observable.generate()Observable.fromIterable()。他們會爲你做適當的檢查。

此外請確保您訂閱並觀察不同線程。否則,您只能爲單個訂戶提供服務。

你舉的例子:

strObservable = Observable.generate(g -> g.onNext("Hello!")); 
strObservable.subscribeOn(Schedulers.newThread()).subscribe(strSubscriber);