2017-06-23 358 views
1

我正在學習RxJava異步訂閱是如何工作的。但有些問題讓我困惑。RxJava異步訂閱將丟失消息

@Test public void testCreateAsync() throws InterruptedException { 
    Observable<String> observable = Observable.create(emitter -> { 
    for (int i = 0; i < 10; i++) { 
     if (!emitter.isDisposed()) { 
     int finalI = i; 
     new Thread(() -> emitter.onNext("value_" + finalI)).start(); 
     } 
    } 
    if (!emitter.isDisposed()) { 
     emitter.onComplete(); 
    } 
    }); 

    observable.subscribe(System.out::println); 
} 

上面的代碼工作正常,並打印value_1value_9。但是,當我前面加上睡眠認購,最後的消息value_9將不被打印出來,像這樣:

@Test public void testCreateAsync() throws InterruptedException { 
    ... 
    Thread.sleep(3000); 
    observable.subscribe(System.out::println); 
} 

任何討論這個問題是升值。

ps:java版本是1.8,而RxJava版本是2.1.1

回答

0

有兩個問題:

  1. 主叫onNext來自不同線程先前線程能夠發出其值以防止任何進一步的onNext排放之前同時
  2. onComplete可以執行。

睡眠沒有特別的效果,反覆執行上面的代碼有時會產生行爲。

(我不清楚你想用這個設置達到什麼目的)。

+0

這實際上沒有意義,只是一個演示。感謝您的回答。我忽略了併發問題。 – lovexiaov