你可以使用Zip
運營商,一旦他們結束拉上所有的請求一起,檢查那裏,如果他們都是成功
private Scheduler scheduler;
private Scheduler scheduler1;
private Scheduler scheduler2;
/**
* Since every observable into the zip is created to subscribeOn a different thread, it´s means all of them will run in parallel.
* By default Rx is not async, only if you explicitly use subscribeOn.
*/
@Test
public void testAsyncZip() {
scheduler = Schedulers.newThread();
scheduler1 = Schedulers.newThread();
scheduler2 = Schedulers.newThread();
long start = System.currentTimeMillis();
Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
.concat(s3))
.subscribe(result -> showResult("Async in:", start, result));
}
private Observable<String> obAsyncString() {
return Observable.just("Request1")
.observeOn(scheduler)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "Hello");
}
private Observable<String> obAsyncString1() {
return Observable.just("Request2")
.observeOn(scheduler1)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> " World");
}
private Observable<String> obAsyncString2() {
return Observable.just("Request3")
.observeOn(scheduler2)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "!");
}
在這個例子中,我們卻只是CONCAT的結果,而不是這樣做,你可以檢查結果並在那裏做你的業務邏輯。也可以考慮merge
或contact
。
您可以在這裏https://github.com/politrons/reactive
看看更多的例子創造了超過1'Schedulers.newThread()'是不必要的。它爲每個創建的工作人員創建一個新的線程,因此只需重複使用相同的調度程序就可以獲得相同的結果。 – Kiskae