2015-11-30 87 views
1

我已經多次需要這樣的構建,而且我不知道如何處理它。我的問題是:當發生A時,我想創建一個複雜的可觀察(通過組合幾個運算符)。它將異步完成一些操作,發佈結果並完成。同時,我希望允許對這個可觀察對象進行新的訂閱,但是一旦它完成,就應該創建新的可觀察對象,這是第一個觀察對象的副本(或者只是做同樣的事情)。訂閱現有的可觀察物體,除非它已完成

(編輯)作爲一個例子,讓我們來看一個簡單的觀測值:Observable obs = Observable.just(true).delay(1, TimeUnit.SECONDS)。我的目標是針對以下行爲:

[毫秒:行動]

0:obs.subscribe(...) - 謹以此觀察到後〜完成1S

500:obs.subscribe(...) - 這應該後完成〜500ms的

950:原始觀察到的應該已經完成​​:如上所述,50ms的後

1500應完成。我現在想重新開始一切,之後1秒

2000這裏完全有訂閱:在這裏我想連接到連接到最新鮮的觀察到的,並期望它500S後完成(由於新的第二始於1500算起)

我不知道如何以正確和線程安全的方式來做到這一點。我可以用一個可觀察的事物做到嗎?

+0

一種模糊的描述,但聽起來像你需要'分享'。 – akarnokd

+0

@akarnokd我看到「分享」會如何幫助我分享可觀察的內容。那麼當前一個完成後,我將如何重新創建可觀測值?我編輯了我的問題,使問題更加清晰 – wasyl

回答

2

您可以使用defershare來實現此目的。

Observable<Long> o = Observable.defer(() -> 
    Observable.just(System.currentTimeMillis()).delay(1, TimeUnit.SECONDS)) 
.share(); 

o.subscribe(System.out::println); // T = 0 
Thread.sleep(500); 
o.subscribe(System.out::println); // T = 500 
Thread.sleep(450); 
o.subscribe(System.out::println); // T = 950 

Thread.sleep(550); 
o.subscribe(System.out::println); // T = 1500 
Thread.sleep(500); 
o.subscribe(System.out::println); // T == 2000 

Thread.sleep(1000); 

前3將完成在1s和第二後兩位相同的時間(具有相同的值)將完成第一批後1.5秒(具有不同的值到所述第一)。

+0

非常感謝!我其實已經寫了幾乎完全相同的測試用例,但既然你發佈了答案,我會很樂意接受它。 – wasyl