2016-07-02 39 views
1

我們有微服務架構,我們通過網絡進行服務間呼叫。 我們在頂級服務中使用RxJava,這導致創建大量的並行請求到底層服務。 正因爲如此,我得到了「無路由到主機錯誤」或「連接錯誤」。 爲此,我想減慢RxJava Observable的排放,以便在創建新連接之前先關閉連接。 下面是示例代碼:如何延遲RxJava中的可觀察發射

package com.demo.rxjava.rxjaxa.creation; 
    import rx.Observable; 
    import rx.Subscriber; 
    import rx.schedulers.Schedulers; 

    public class Delay { 

     public static void main(String[] args) throws InterruptedException { 
      Observable.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.io()) 
        .flatMap(integer -> { 
         return function1(integer); 
        }).observeOn(Schedulers.io()) 
        .subscribe(new Subscriber<String>() { 
         @Override 
         public void onNext(String item) { 
          System.out.println("Next: " + item); 
         } 

         @Override 
         public void onError(Throwable error) { 
          System.err.println("Error: " + error.getMessage()); 
         } 

         @Override 
         public void onCompleted() { 
          System.out.println("Sequence complete."); 
         } 
        }); 
     } 

    public Observable<String> function1(String id) { 
       // This is where we make network call 
       Observable<Response> response = Rx.newClient(RxObservableInvoker.class) 
         .target("http://example.com/resource") 
         .request() 
         .queryParam("id", id) 
         .rx() 
         .get(); 
       response.obserOn(Schedulers.from(threadExecutor)).flatMap(response->{ 
        return response.extractResponse(); 
       }); 
    } 
} 

回答

0

爲了耽誤你可以使用壓縮並結合在你的第一個Observable.from發出的每一個項目去的X時間間隔的特定步驟。

/** 
* If we want to delay the every single item emitted in the pipeline we will need a hack, 
* one possible hack is use zip operator and combine every item emitted with an interval so every item emitted has to wait until interval emit the item. 
*/ 
@Test 
public void delay() { 
    long start = System.currentTimeMillis(); 
    Subscription subscription = Observable.zip(Observable.from(Arrays.asList(1, 2, 3)), Observable.interval(200, TimeUnit.MILLISECONDS), (i, t) -> i) 
              .subscribe(n -> System.out.println("time:" + (System.currentTimeMillis() - start))); 
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(3000, TimeUnit.MILLISECONDS); 
} 

這將打印在這裏

time:537 
    time:738 
    time:936 

更多practicle例子https://github.com/politrons/reactive

+0

這隻會延遲發射的開始,我想延遲每個特定間隔的可觀測發射。 –

+0

查看我的新回覆 – paul

+0

它可以工作,但是在生產代碼中它給出了缺少背壓的錯誤。 –

0

而不是拖延你的要求,你應該有請求到底部服務發生在Scheduler限制並行活動。例如:

int maxParallel = 4; 
Scheduler scheduler = Schedulers.from(
    Executors.newFixedThreadPool(maxParallel)); 
... 
observable 
    .flatMap(x -> 
     submitToBottomService(x) 
     .subscribeOn(scheduler)) 
    .subscribe(subscriber); 

順便說一句,你提到關閉連接。 Observable.using運算符用於關閉被動上下文中的資源(它在關閉和取消訂閱時關閉資源)。如果你還沒有使用它,然後看看它。

+0

我們正在Scheduler上運行底層服務調用,但是我們正在爲底層服務創建異步調用,所以它在第一次調用返回結果之前創建了很多連接。 –

+0

應該使用調度程序進行異步調用,您想提供更多代碼嗎? –

+0

我無法分享原始代碼,但使用sudo代碼更新了問題 –