2017-07-14 23 views
0

我是新來的整個反應範式,我試圖瞭解如何從像SQS隊列讀取時背壓是如何工作的。Reactor/RxJava SQS

反應堆你有你的流量和RxJava你有你的可觀察投票SQS像背景:

while (true) { 
    Future<ReceiveMessageResult> future = sqsClient.receiveMessageAsync(queueUrl); 
    //emit or send to subscribers 
} 

比方說你有一個需要做出REST調用是下游組件價格有限。你如何告訴輪詢者由於速度限制而放慢速度,所以你最終不會在內存中留下一堆帶有OOM潛力的實時信息?

回答

1

在這種情況下,您需要使用Flowable。使用Flowable,用戶可以在處理接收到的消息後,請求下一批次的同時請求一定數量的消息。

編號:https://medium.com/@srinuraop/rxjava-backpressure-3376130e76c1

Flowable<Integer> observable = Flowable.range(1, 133); 
observable.subscribe(new DefaultSubscriber<Integer>() { 
@Override public void onStart() { 
request(1); 
} 
@Override public void onNext(Integer t) { 
LOGGER.info(「item 「+t); 
//this where you request message, in this case one message at time and after processing one message it will request for next one. 
request(1); 
} 
@Override public void onError(Throwable t) { 
LOGGER.info(「」+t); 
} 
@Override public void onComplete() { 
LOGGER.info(「complete」); 
} 
}); 
+0

明白了!謝謝! – 2Real