2016-02-25 31 views
2

我使用的Redis作爲隊列(使用彈簧隊列入/出站通道適配器)來分發任務(一個消息到隊列等)Redis的隊列與高通量彈簧一體化是損耗消息

由於吞吐量相當高,我們觀察到,儘管消息已發送到redis隊列,但其中很多消息都已丟失,入站(報頭路由器)後沒有任何消息到達組件(

通道配置爲附在下面;問題的關鍵在於我們雖然問題出現在入站加載器後面的頭部路由器中,但無法管理從隊列中讀取的消息的速率,因此它們丟失了。

我們在入站適配器和這個組件之間使用了一箇中間元素(這是一個頭路由器)並添加一個隊列來解決這個問題。

這工作正常,但實際上我們並沒有完全理解解決方案,如果這是正確的。

有關此配置的專家觀點和看法將會很好!

感謝


<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages 
    from a Redis List. --> 
    <redis:queue-inbound-channel-adapter 
    id="fromRedis" channel="in" queue="${name}" 
     receive-timeout="1000" recovery-interval="3000" expect-message="true" 
      auto-startup="true"/> 

    <!-- a queue to avoid lost messages before the header router --> 
    <int:channel id="in"> 
     <int:queue capacity="1000"/> 
    </int:channel> 

    <!-- a bridge to connect channels and have a poller --> 
    <int:bridge input-channel="in" output-channel="out"> 
     <int:poller fixed-delay="500" /> 
    </int:bridge> 

    <int:header-value-router id="router" timeout="15000" 
     input-channel="out" header-name="decision" 
     resolution-required="false" default-output-channel="defaultChannel" /> 

---在26/02

加入到消息插入redis的,我們有一個Web服務,但實際上是如你所說,只是寫郵件到Redis的(

for... channel.send(msg) 

沒有更多

關於你的回答我現在正在考慮刪除in channel和它的隊列,並直接使用header-value-router;但我有更多的問題:

  1. 我認爲正確的解決方案是在報頭值路由器超時低值,所以我有錯誤通知,如果沒有我們提供消費者更快。如果我不使用超時值,它會無限期地阻塞,這是一個壞主意,不是嗎?

  2. 我不知道如何管理MesssageDeliveryException,因爲路由器沒有錯誤通道配置,???

  3. 我認爲,如果我可以管理這個錯誤並獲取消息,我可以重新發送給redis。還有其他服務器從redis獲取消息,他們很幸運可以參加。

添加我提出的解決方案,而不是完整的,我們不知道的錯誤管理正如我上面

<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages 
from a Redis List. --> 
<redis:queue-inbound-channel-adapter 
id="fromRedis" channel="in" queue="${name}" 
    receive-timeout="1000" recovery-interval="3000" expect-message="true" 
     auto-startup="true"/> 

<!-- a header-value-router with quite low timeout --> 
    <int:header-value-router id="router" timeout="150" 
    input-channel="in" header-name="decision" 
    resolution-required="false" default-output-channel="defaultChannel" /> 

<!-- ¿if MessageDeliveryException???? what to do??? --> 

<int:channel id="someConsumerHeaderValue"> 
    <int:dispatcher task-executor="ConsumerExecutor" /> 
</int:channel> 
<!-- If 5 threads are busy we queue messages up to 5; if queue is full we can increase to 5 more working threads; if no more threads we have a... ¿¿MessageDeliveryException?? --> 

<task:executor id="ConsumerExecutor" pool-size="5-5" 
       queue-capacity="5" /> 

回答

1

很好的解釋,這是偉大的,看到這樣的觀察。這可能會以某種方式改進框架。

所以,我希望看到:

  1. 一些測試情況下,從框架的角度來重現。 雖然我猜這只是向Redis發送大量消息並使用您的配置消耗。 (如果有其他需要,請糾正我)

  2. <int:header-value-router>之後的下游流量。你看,你用有timeout="15000"這是同義詞的send-timeout

指定是否阻止 將消息發送到目標MessageChannels時等待 的時間以毫秒計算的最高金額是可能的(例如,一個有界隊列目前已滿的頻道)。 默認情況下,發送將無限期阻止。 'timeout'的同義詞 - 只能提供一個。

在這裏,我可以說,如果你的下游消費是否有足夠的慢一些QueueChannel有你最終的:

/** 
* Inserts the specified element at the tail of this queue, waiting if 
* necessary up to the specified wait time for space to become available. 
* 
* @return {@code true} if successful, or {@code false} if 
*   the specified waiting time elapses before space is available 
* @throws InterruptedException {@inheritDoc} 
* @throws NullPointerException {@inheritDoc} 
*/ 
public boolean offer(E e, long timeout, TimeUnit unit) 
.... 
while (count.get() == capacity) { 
     if (nanos <= 0) 
      return false; 
     nanos = notFull.awaitNanos(nanos); 
} 

注意一點return false;表示完全message lost

這也是知道像back-pressure drop策略。

讓我知道你是否有不同的圖片。

您可能會考慮刪除timeout="15000"以符合相同的in隊列通道行爲。

UPDATE

那麼,錯誤處理的工作方式有些不同的方式。 「有罪」的組件只是拋出異常,就像使用原始Java一樣,這個組件不負責捕獲異常,這是由調用者決定的。 而在我們的情況下調用者的上游組件 - <redis:queue-inbound-channel-adapter>

任何入站通道適配器都有一個error-channel選項。通過<poller>如果它是MessageSource或直接當它是MessageProducer

我敢肯定,你將能夠處理:

if (!sent) { 
    throw new MessageDeliveryException(message, 
      "failed to send message to channel '" + channel + "' within timeout: " + timeout); 
} 
error-channel子流

,實現您的恢復需求。

+0

謝謝阿爾喬姆,我在我的問題上添加了更多信息,如果您能給出您的意見,我將非常感謝 – earroyoron

+0

在我的答案中查看更新。 –

+0

對不起,在驗證你的偉大(一如既往)答案時,謝謝 – earroyoron