2014-12-11 71 views
0

我以下列方式使用rxjava維護任務的定時器發出的結果:如何使用可觀察與flatMap

在一類,我需要定期維護,我用下面的靜態訂閱,這導致可觀測當班級被加載到內存中時,首次啓動,然後按照指定的時間間隔進行。

private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS) 
     .flatMap(new Func1<Long, Observable<String>>() { 
      @Override public Observable<String> call(Long aLong) { 

       // some code 

       return Observable.just(null); 
      } 
     }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()) 
     .subscribe(); 

現在我有一個情況,在這裏我要舉報我維護用戶界面的結果。

通常情況下,我會用下面的架構

Observable.create(new Observable.OnSubscribe<String>() { 

     @Override public void call(Subscriber<? super String> subscriber) { 

      // some code 

      subscriber.onNext(result); 
      subscriber.onCompleted();   } 

    }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Action1<String>() { 

       @Override public void call(String result) { 

        // write to the UI 

       } 

      }); 

但在這裏,我們有剛剛執行一次可觀察到的。

對於定期執行的Observable,我找不到在Subscriber中調用Action的方法,以便我可以使用subscriber.onNext()傳遞結果。看起來Observable沒有合適的簽名,它可以在timer()中佔用很長的時間,同時允許使用action來訂閱。但知道rxjava我相信有一個技巧;-)

我可以使用zip壓縮一個Timer Observable和一次性Observable(基本上壓縮兩個版本在一起),但我寧願使用第一個結構,因爲它表現稍有不同。

-

我試着按以下方式兩個版本合併成一個:

private static Subscription subscription = Observable.timer(0, 5, TimeUnit.SECONDS) 
     .flatMap(new Func1<Long, Observable<String>>() { 
      @Override public Observable<String> call(Long aLong) { 

       // some code // stays here to ensure there is no concurrency while executing 

       final String result = "result"; // I store the result in a final variable after some code has been finished 

       return Observable.create(new Observable.OnSubscribe<String>() { 
        @Override public void call(Subscriber<? super String> subscriber) { 
         subscriber.onNext(result); // then I use it in a new Observable and emit it 
         subscriber.onCompleted(); // not sure if this is needed here (haven't tested this yet) 
        } 
       }); 

      } 
     }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()) 
     .subscribe(new Action1<String>() { 
      @Override public void call(String result) { 
       // so I can finally consume the result on the UI thread 
      } 
     }); 

不是創建和發射「空」可觀察的,我創建了一個,讓我送導致訂戶。

很麻煩,但這應該工作,對吧?任何簡單的解決方案?你怎麼看?

+0

爲什麼'Observable.timer.flatMap'不起作用? – zsxwing 2014-12-11 09:01:50

+0

我相信這是行不通的,因爲flatMap將其發射的Observable變成一個單一的Observable(我想要的),但是,我沒有找到一個簽名,它給了我一個與訂閱者的接口[就像@Override public void call (Subscriber <?super String> subscriber)],所以我可以使用subscriber.onNext()。 – 2014-12-11 15:30:56

回答

1

我不是很想理解你的問題。

你想從你的Observable Timer和你的其他Observable使用相同的訂閱嗎?

也許你可以使用主體作爲你的Observables之間的橋樑。

Subject<?, ?> bridge = PublishSubject.create(); 
Observable.timer(5, SECONDS).flatMap(/* whatever*/).mergeWith(bridge).subscribe(/* toDo */); 
Observable.create(/* subscription */).subscribe(bridge); 

你「有一次觀察的」發出的每個項目將被推入brige是,將其推入「計時器」觀測。

這是你在找什麼?

+0

[我從未使用過主題/橋樑,所以我需要先閱讀並試用,然後才能說出它是否有效。我會在一段時間內回到這個頁面。] – 2014-12-11 15:33:12

+0

我已經在上面添加了一個版本,我已經合併了這兩個代碼片段。沒有測試,我相信這應該工作。通過PublishSubject,我認爲我們也處於一個很好的方式,也許能夠比上面做的更簡單地解決問題。但是從你的解釋中可以看出,「每一件物品都會被你的」一次可觀察「發射出去,將會被推入布里奇,這將推動它進入」計時器「可觀測的狀態。它看起來像你誤解了,因爲我不想結合兩個Observables。我只打算使用Timer Observable作爲時鐘,我不需要它的任何值。 – 2014-12-11 17:07:11

+0

好的。也許你想把你的計時器與其他可觀察的東西拉在一起。無論如何,我想你明白了這個訣竅。 ;) – dwursteisen 2014-12-11 17:10:36