2017-01-31 14 views
2

我有一些UI事件調用onNext()一個PublishSubject。用戶通常需要2秒才能完成其工作。我需要忽略所有對onNext()的呼叫,除了用戶忙時的最後一個呼叫。我嘗試了以下,但我無法控制流量。這些請求似乎排起了長隊,並且每一個請求都得到了處理(所以背壓似乎並不正常)。我怎樣才能讓它忽略所有的請求,但最後一個? (我不想使用debounce,因爲代碼需要立即作出反應,任何合理的小超時都不起作用)。RxJava主題與背壓 - 只讓最後一個值一旦發出下游已完成耗時

此外,我認識到使用subscribeOn與主題沒有效果,因此我使用observeOn在其中一個操作員做異步工作。這是正確的方法嗎?

Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized(); 

loadingQueue 
    .toFlowable(BackpressureStrategy.LATEST) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .map(discarded -> { 
    // PRE-LOADING 
    Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getName()); 
    return discarded; 
    }) 
    .observeOn(Schedulers.computation()) 
    .map(b -> { 
    Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getName()); 
    Thread.sleep(2000); 
    return b; 
    }) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(b -> { 
     Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getName() + "\n\n"); 
    }); 


loadingQueue.onNext(true); 
loadingQueue.onNext(true); 
loadingQueue.onNext(true); 
.... 

我看到的輸出是:

PRE-LOADING: main 
PRE-LOADING: main 
LOADING: RxComputationThreadPool-1 
PRE-LOADING: main 
PRE-LOADING: main 
PRE-LOADING: main 
PRE-LOADING: main 
PRE-LOADING: main 
PRE-LOADING: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 
FINISHED: main 

相反,我想到的代碼做以下(即負載一次,而它的負荷,背壓憋上的所有請求,併發出最後一個,一旦第一觀察者已經完成 - 所以在總應該理想地僅最多加載了兩次):

PRE-LOADING: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 

PRE-LOADING: main 
LOADING: RxComputationThreadPool-1 
FINISHED: main 

回答

4

你不能用observeOn做到這一點,因爲這將緩衝至少1元素,因此如果已經存在一個「LOADING」,則始終執行「PRE-LOADING」階段。

但是你可以用delay做到這一點,因爲它不操作上鍊和時間表要求量的每個onNext單獨上無需排隊它自身的調度:

public static void main(String[] args) throws Exception { 
    Subject<Boolean> loadingQueue = 
     PublishSubject.<Boolean>create().toSerialized(); 

    loadingQueue 
     .toFlowable(BackpressureStrategy.LATEST) 
     .delay(0, TimeUnit.MILLISECONDS, Schedulers.single())  // <------- 
     .map(discarded -> { 
     // PRE-LOADING 
     System.out.println("PRE-LOADING: " 
      + Thread.currentThread().getName()); 
     return discarded; 
     }) 
     .delay(0, TimeUnit.MILLISECONDS, Schedulers.computation()) // <------- 
     .map(b -> { 
      System.out.println("LOADING: " 
      + Thread.currentThread().getName()); 
     Thread.sleep(2000); 
     return b; 
     }) 
     .delay(0, TimeUnit.MILLISECONDS, Schedulers.single())  // <------- 
     .rebatchRequests(1)    // <----------------------------------- one-by-one 
     .subscribe(b -> { 
      System.out.println("FINISHED: " 
       + Thread.currentThread().getName() + "\n\n"); 
     }); 


    loadingQueue.onNext(true); 
    loadingQueue.onNext(true); 
    loadingQueue.onNext(true); 

    Thread.sleep(10000); 
} 
+0

這工作很漂亮,非常感謝你許多。我必須承認,我永遠無法想出這個。我現在必須更詳細地研究延遲和重新補充請求。 – strangetimes

+0

@akarnokd我很好奇,我發現,與'observeOn替換在原始代碼中的第一個'observeOn'(Schedulers.single(),假,1)'結果在相同的行爲 - 這是一個有效的替代方案,或是這樣做有一些缺點嗎?謝謝! – ahmedre

+0

如果項目是可區分的,那麼是的;它將存儲項目#2,而最新的項目運行到項目#N。當處理結束時,新的請求將處理項目#2,並且第一個observeOn將存儲項目#N。你不會得到最新的但有些時候的快照。 – akarnokd

相關問題