我們有微服務架構,我們通過網絡進行服務間呼叫。 我們在頂級服務中使用RxJava,這導致創建大量的並行請求到底層服務。 正因爲如此,我得到了「無路由到主機錯誤」或「連接錯誤」。 爲此,我想減慢RxJava Observable的排放,以便在創建新連接之前先關閉連接。 下面是示例代碼:如何延遲RxJava中的可觀察發射
package com.demo.rxjava.rxjaxa.creation;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class Delay {
public static void main(String[] args) throws InterruptedException {
Observable.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.io())
.flatMap(integer -> {
return function1(integer);
}).observeOn(Schedulers.io())
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
}
public Observable<String> function1(String id) {
// This is where we make network call
Observable<Response> response = Rx.newClient(RxObservableInvoker.class)
.target("http://example.com/resource")
.request()
.queryParam("id", id)
.rx()
.get();
response.obserOn(Schedulers.from(threadExecutor)).flatMap(response->{
return response.extractResponse();
});
}
}
這隻會延遲發射的開始,我想延遲每個特定間隔的可觀測發射。 –
查看我的新回覆 – paul
它可以工作,但是在生產代碼中它給出了缺少背壓的錯誤。 –