2016-11-28 55 views
3

場景:如何獲得不改變流的副作用訂閱?

我有一個數據列表(業務交易)。該數據被視爲失效如果滿足以下兩個條件:

  • 用戶(UI /視圖)訂閱該數據源(一個rx.Observable<List<Transaction>>
  • > = 5分鐘自上次網絡同步已通過。該值被序列化並從數據庫中觀察。

注:如果UI /視圖未訂閱的數據源,> = 55分鐘已經過去了,我不想同步的最新數據(因爲沒有人聽)

一些下面的代碼示例的詳細信息:

  • data.observe()返回rx.Observable<List<Transaction>>
  • void syncIfLast5Minutes()將勢在必行檢查是否出現了同步在最後五分鐘 - 如果沒有,那麼一個新的網絡請求將被執行抓取最新數據

現在,我可以很容易地做一些事情,如:

data.observe() 
    .doOnSubscribe(transactions -> syncIfLast5Minutes() 

但是,如果數據是由UI的初始訂閱過期,這將只檢查。如果用戶界面在5分鐘後仍然訂閱,自動刷新將不會在這裏觸發(但我希望這樣做)。

我尋找副作用操作者將

  • 不會影響原來的流時的數據是陳舊
  • 訂閱啓動定時器,將監視(> = 5分鐘已經過去)
  • unsusbscribing取消此計時器

是否有RxJava一個慣用的方式來被動地實現這一目標?

回答

0

您可以通過合併您的流與Observable.interval來執行您的副作用(刷新數據),並將ignoreElements應用於此。

這是一個工作示例:

import java.util.concurrent.TimeUnit; 

import rx.Observable; 
import rx.subjects.PublishSubject; 

public class Main { 

    public static void main(String[] args) throws InterruptedException { 
     PublishSubject<String> data = PublishSubject.create(); 
     //refresh data every second for this demo 
     //for your use case it's every 5 minutes 
     data.mergeWith(Observable.interval(1, TimeUnit.SECONDS) 
       // cause timed side-effect that updates data stream 
       .doOnNext(n -> data.onNext(n + "")) 
       .ignoreElements() 
       .cast(String.class)) 
      .doOnNext(System.out::println) 
      .toBlocking() 
      .subscribe(); 
    } 

} 

如果多個用戶使用的是流你可能想看看.share所以只有一個刷新動作發生的每5分鐘。

+0

如果我每毫秒發光,是不是很多的對象創建?注意:我堅持數據上次刷新的時間,因此必須在初始訂閱時進行檢查。我還爲我的問題添加了一些代碼 – ZakTaccardi

+0

對不起,我的例子應該是1秒。你的情況是5分鐘。 –

+0

這個想法是刷新日程安排,而不是重複檢查陳舊的數據。如果'interval'不適合您的需要,您可能還想考慮使用'Observable.timer'。 –