2015-09-17 66 views
1

我有一個長時間運行的進程在後臺正在處理文件並將其進度發佈到BehaviorSubjectsubject.onNext(progress)並且一旦它完成調用subject.onCompletedRxJava塊流直到其他觀察者完成

在流程中的某個點,我想訂閱該subject並獲得當前進度或等待其完成。

prepareOtherStuff() 
    .flatMap(validate()) 
    .????? <- want to subscribe here 
    .map(finalize()) 
    .subscribe() 

我遇到?????部分的麻煩。無法弄清楚如何阻止流並等待文件處理完成,並獲取文件處理進度以將其顯示給用戶。

例如:

-- other files already processed, don't care about them -- 
File 8 of 10 processed 
File 9 of 10 processed 
-- onCompleted received -- 

我如何能實現這樣的行爲?

回答

1

你可以使用.concatWith(其中TheType是通用型的可觀察到從.flatMap(validate())返回的):

prepareOtherStuff() 
    .flatMap(validate()) 
    .concatWith(subject 
      .doOnNext(m -> log.info(m)) 
      .ignoreElements() 
      .cast(TheType.class)) 
    .map(finalize()) 
    .subscribe()