我使用的Redis作爲隊列(使用彈簧隊列入/出站通道適配器)來分發任務(一個消息到隊列等)Redis的隊列與高通量彈簧一體化是損耗消息
由於吞吐量相當高,我們觀察到,儘管消息已發送到redis隊列,但其中很多消息都已丟失,入站(報頭路由器)後沒有任何消息到達組件(
通道配置爲附在下面;問題的關鍵在於我們雖然問題出現在入站加載器後面的頭部路由器中,但無法管理從隊列中讀取的消息的速率,因此它們丟失了。
我們在入站適配器和這個組件之間使用了一箇中間元素(這是一個頭路由器)並添加一個隊列來解決這個問題。
這工作正常,但實際上我們並沒有完全理解解決方案,如果這是正確的。
有關此配置的專家觀點和看法將會很好!
感謝
<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages
from a Redis List. -->
<redis:queue-inbound-channel-adapter
id="fromRedis" channel="in" queue="${name}"
receive-timeout="1000" recovery-interval="3000" expect-message="true"
auto-startup="true"/>
<!-- a queue to avoid lost messages before the header router -->
<int:channel id="in">
<int:queue capacity="1000"/>
</int:channel>
<!-- a bridge to connect channels and have a poller -->
<int:bridge input-channel="in" output-channel="out">
<int:poller fixed-delay="500" />
</int:bridge>
<int:header-value-router id="router" timeout="15000"
input-channel="out" header-name="decision"
resolution-required="false" default-output-channel="defaultChannel" />
---在26/02
加入到消息插入redis的,我們有一個Web服務,但實際上是如你所說,只是寫郵件到Redis的(
for... channel.send(msg)
沒有更多
關於你的回答我現在正在考慮刪除in channel和它的隊列,並直接使用header-value-router;但我有更多的問題:
我認爲正確的解決方案是在報頭值路由器超時低值,所以我有錯誤通知,如果沒有我們提供消費者更快。如果我不使用超時值,它會無限期地阻塞,這是一個壞主意,不是嗎?
我不知道如何管理MesssageDeliveryException,因爲路由器沒有錯誤通道配置,???
我認爲,如果我可以管理這個錯誤並獲取消息,我可以重新發送給redis。還有其他服務器從redis獲取消息,他們很幸運可以參加。
添加我提出的解決方案,而不是完整的,我們不知道的錯誤管理正如我上面
<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages
from a Redis List. -->
<redis:queue-inbound-channel-adapter
id="fromRedis" channel="in" queue="${name}"
receive-timeout="1000" recovery-interval="3000" expect-message="true"
auto-startup="true"/>
<!-- a header-value-router with quite low timeout -->
<int:header-value-router id="router" timeout="150"
input-channel="in" header-name="decision"
resolution-required="false" default-output-channel="defaultChannel" />
<!-- ¿if MessageDeliveryException???? what to do??? -->
<int:channel id="someConsumerHeaderValue">
<int:dispatcher task-executor="ConsumerExecutor" />
</int:channel>
<!-- If 5 threads are busy we queue messages up to 5; if queue is full we can increase to 5 more working threads; if no more threads we have a... ¿¿MessageDeliveryException?? -->
<task:executor id="ConsumerExecutor" pool-size="5-5"
queue-capacity="5" />
謝謝阿爾喬姆,我在我的問題上添加了更多信息,如果您能給出您的意見,我將非常感謝 – earroyoron
在我的答案中查看更新。 –
對不起,在驗證你的偉大(一如既往)答案時,謝謝 – earroyoron