2017-05-15 64 views
1

有限的搜索結果比方說,我有兩個Observable S:
obs1發出一個搜索框用戶輸入的結果,
obs2需要一個字符串作爲輸入,併發起一個HTTP請求,然後提供結果實現的速度使用`Observable`s

現在,我想通過一些固定的時間間隔限制HTTP請求的數量並不但取決於當obs2與當前請求,像這樣做了:

  1. 用戶類型tobs2立即開始與t
  2. 用戶類型te的請求,obs2仍然是 「忙」,什麼都不會發生
  3. 用戶類型tesobs2仍然是 「忙」,什麼都不會發生
  4. 用戶類型testobs2仍然是「忙」,什麼都不會發生
  5. t -HTTP響應到達,obs2現在是「自由」,它着眼於obs1最後發出的值,並認爲test,開始一個新的請求
  6. test -HTTP響應已到,obs2現在是「空閒」,它看起來obs1最後發出的值並找到test,因爲該值沒有改變,因此不做任何事情。

我可以通過引入額外的變量,將指示系統的狀態和搜索查詢累加器,但是我不知道是否這可能純粹功能性的方式,即通過單獨使用rxJava方法來完成這樣做呢?

+0

你知道這可能是與近2倍等待時間延遲結束了?發出多個Web請求以避免最終用戶的長時間延遲不是更好嗎? – Enigmativity

回答

1

查看代碼和評論。

import rx.Observable; 
import rx.schedulers.Schedulers; 
import rx.subjects.PublishSubject; 
import xdean.jex.extra.Pair; 

public class Q43975663 { 
    public static void main(String[] args) throws InterruptedException { 
    PublishSubject<String> textSub = PublishSubject.create(); // emit user input text 
    PublishSubject<String> taskSub = PublishSubject.create(); // emit when execution thread is free 
    // core 
    Observable 
     // when new text input or execution thread change to free, emit an item 
     .combineLatest(textSub.distinctUntilChanged(), taskSub, Pair::of) 
     // if the text not change or task cycle not change, ignore it 
     .scan((p1, p2) -> 
      (p1.getLeft().equals(p2.getLeft()) || p1.getRight().equals(p2.getRight())) ? 
       p1 : p2) 
     .distinctUntilChanged() 
     // map to user input text 
     .map(Pair::getLeft) 
     // scheduler to IO thread 
     .observeOn(Schedulers.io()) 
     // do HTTP request 
     .doOnNext(Q43975663::httpTask) 
     // emit item to notify the execution thread is free 
     .doOnNext(taskSub::onNext) 
     .subscribe(); 
    // test 
    taskSub.onNext("start"); 
    textSub.onNext("t"); 
    textSub.onNext("te"); 
    textSub.onNext("tex"); 
    textSub.onNext("text"); 
    Thread.sleep(5000); 
    textSub.onNext("new"); 
    textSub.onNext("new"); 
    textSub.onNext("text"); 
    Thread.sleep(5000); 
    } 

    static void httpTask(String id) { 
    System.out.printf("%s \tstart on \t%s\n", id, Thread.currentThread()); 
    try { 
     Thread.sleep((long) (Math.random() * 1000)); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    System.out.printf("%s \tdone on \t%s\n", id, Thread.currentThread()); 
    } 
} 

注意Pair是一個簡單的類,有兩個值,左和右。

輸出:

t  start on Thread[RxIoScheduler-2,5,main] 
t  done on  Thread[RxIoScheduler-2,5,main] 
text start on Thread[RxIoScheduler-2,5,main] 
text done on  Thread[RxIoScheduler-2,5,main] 
new  start on Thread[RxIoScheduler-2,5,main] 
new  done on  Thread[RxIoScheduler-2,5,main] 
text start on Thread[RxIoScheduler-2,5,main] 
text done on  Thread[RxIoScheduler-2,5,main]