2016-04-08 22 views
5

我有下面的類(簡體):如何測試observable在RxJava中使用正確的調度程序?

public class Usecase<T> { 
    private final Observable<T> get; 
    private final Scheduler observeScheduler; 

    public Usecase(Observable<T> get, Scheduler observeScheduler) { 
     this.get = get; 
     this.observeScheduler = observeScheduler; 
    } 

    public Observable<T> execute() { 
     return get.subscribeOn(Schedulers.io()).observeOn(observeScheduler); 
    } 
} 

而且我編寫單元測試它。我如何測試subscribeOnobserveOn是否被調用了正確的值?

我嘗試以下方法:

Observable<String> observable = mock(Observable.class); 
    Usecase<String> usecase = new Usecase(observable, Schedulers.computation()); 
    usecase.execute(); 

    verify(observable).subscribeOn(Schedulers.computation()); // should fail here, but passes 
    verify(observable).observeOn(Schedulers.computation()); // should pass, but fails: Missing method call for verify(mock) here 

上述失敗(我認爲),因爲subscribeOnobserveOnfinal方法。那麼可能有其他方法可以確保observable使用正確的調度程序?

回答

3

There is a way可以間接訪問觀察點正在運行和被觀察的兩個線程,這意味着您實際上可以驗證Observable使用正確的調度程序。

我們僅限於通過名稱驗證線程。幸運的是,Schedulers.io()使用的線程以一致的前綴命名,我們可以匹配。下面是一個有獨特的前綴參考調度的(滿了嗎?)名單:

  • Schedulers.io() - RxCachedThreadScheduler
  • Schedulers.newThread() - RxNewThreadScheduler
  • Schedulers.computation() - RxComputationThreadPool

要驗證Observable是訂閱IO線程上:

// Calling Thread.currentThread() inside Observer.OnSubscribe will return the 
// thread the Observable is running on (Schedulers.io() in our case) 
Observable<String> obs = Observable.create((Subscriber<? super String> s) -> { 
    s.onNext(Thread.currentThread().getName()); 
    s.onCompleted(); 
}) 

// Schedule the Observable 
Usecase usecase = new Usecase(obs, Schedulers.immediate()); 
Observable usecaseObservable = usecase.execute(); 

// Verify the Observable emitted the name of the IO thread 
String subscribingThread = usecaseObservable.toBlocking().first(); 
assertThat(subscribingThread).startsWith("RxCachedThreadScheduler"); 

要驗證可觀察到的是計算線程觀察到,您可以使用TestSubscriber#getLastSeenThread訪問用於觀察的最後一個線程。

TestSubscriber<Object> subscriber = TestSubscriber.create(); 
UseCase usecase = new UseCase(Observable.empty(), Schedulers.computation()) 
usecase.execute().subscribe(subscriber); 

// The observable runs asynchronously, so wait for it to complete 
subscriber.awaitTerminalEvent(); 
subscriber.assertNoErrors(); 

// Verify the observable was observed on the computation thread 
String observingThread = subscriber.getLastSeenThread().getName(); 
assertThat(observingThread).startsWith("RxComputationThreadPool"); 

沒有第三方庫或嘲諷是必要的,雖然我使用AssertJ的流暢startsWith斷言。

2

Observable上應用運算符會返回一個新的Observable,因此任何後續的運算符應用程序都將發生在不同的對象上。您必須遵循Observable中的組合圖來發現應用了哪些運算符以及哪些參數。

這在RxJava中不受支持,您必須依賴內部細節和反思。

一般來說,你不能假設任何關於事件發生的位置,但你可以申請observeOn以確保他們進入你選擇的線程。

相關問題