2015-05-17 87 views
1

我使用observeOn觀察可觀察到在另一個線程:RxJava異步觀察者錯過元素

Observable.just("Hello", "world!").observeOn(Schedulers.io()).subscribe(System.out::println); 

但是這個代碼將不會總是輸出!「世界,你好」。與PublishSubject相同:

PublishSubject<String> subject = PublishSubject.create(); 
subject.observeOn(Schedulers.io()).subscribe(System.out::println); 
subject.onNext("Hello"); 
subject.onNext("world!"); 

爲什麼此代碼不總是打印「Hello world!」 ?我認爲至少在第二個示例中,訂閱會收到兩條消息,因爲它在onNext調用之前訂閱。有沒有辦法接收這兩個消息?

+0

你真的想在這裏使用Schedulers.io()嗎?根據文檔,它只能用於IO操作,如文件讀取或網絡。 – Stepango

+0

從潛在問題的角度來看,使用io()是無關緊要的。它的目的是阻止操作,通常是IO,但沒有阻止你對它們進行計算。 – akarnokd

回答

0

底層RxJava調度程序的線程都是守護線程,如果所有其他非守護線程都完成,它們將被JVM停止。如果您從static void main()運行示例,很可能您的main()方法將在其他線程有機會運行並因此不執行打印代碼之前終止。

根據您要如何觀察一個序列,您可以在可觀測鏈使用toBlocking()或在您的onCompleted()方法實現使用CountDownLatchcountDown()

+0

謝謝,這確實是問題!我用theCountDownLatch來防止這個問題。但是我不確定我是否理解toBlocking的語義。這是否使調用線程阻塞,直到onCompleted被調用? – Hadriana

+0

BlockingObservable上的方法在當前線程上執行並且本質上是阻塞的。一些窮舉方法將保持阻塞,直到上游終止onComplete或onError。 – akarnokd

0

IO調度程序需要一些時間才能做好準備。

當你用onNext發射物品時,它還沒有準備好,所以你錯過了它。

+0

這個回答有點令人誤解:如果給予足夠的時間,他們將在沒有數據丟失的情況下處理onNext,但是示例程序在處理髮生之前退出。 – akarnokd