2014-02-09 71 views
1

我正在設計一個相當複雜的系統,並想知道在netty處理程序中放置jms使用者(activemq,vm協議,非persitent)的最佳方法是什麼。Netty處理程序中的JMS消費者?

讓我解釋一下,我有幾個客戶端使用websockets連接到我的netty服務器。對於每個客戶端連接,我都會創建一個jms使用者,用於在一個或多個主題上偵聽有趣的消息。如果一條有趣的消息到達,我需要做一個額外的步驟(額外的過濾),然後使用websocket將消息發送到客戶端。

是的一個很好的辦法做到這一點:

  • 一個SimpleChannelInboundHandler裏面我宣佈一個民辦非靜態消費者
  • 消費者在channelActive初始化
  • 消費者在channelInactive
  • 破壞
  • 當消費者接收到消息時,我做額外的過濾器a使用ctx.channel()發送它。()

在這種設置中,我有點擔心消費者可能會變成慢消費者,並放慢一切,導致WebSocket通過互聯網。

我想出了一個更復雜的解耦「消費者接收消息」和「通過websocket發送消息」。

  • 一個SimpleChannelInboundHandler裏面我宣佈一個民辦非靜態消費者
  • 消費者在channelActive初始化
  • 當由消費者我把它放在一個blockedqueue接收消息的消費者在channelInactive
  • 破壞
  • 每分鐘我讓一個線程(爲每個客戶端創建)查找隊列並使用ctx.channel()。write()將找到的消息發送到客戶端。

在這一點上,我有點擔心每個客戶端的額外線程。

或者是否有更好的方法來完成這項任務?

回答

1

這是一個經典的緩慢消費者問題,解決問題的第一步是確定檢測到緩慢消費者時適當的操作。如果緩慢的消費者錯過消息是可以接受的,那麼該解決方案在丟棄消息或從訂閱源取消訂閱它們方面存在一些變化。例如,如果客戶端錯過消息是可以接受的,那麼當從JMS接收到消息時,檢查通道是否可寫。如果不是,請刪除該消息。如果您想給自己多一點緩衝區(儘管OS緩衝區很大),您可以跟蹤未完成的寫入完成未來的數量(即消息尚未寫入OS發送緩衝區)和如果有太多未完成的寫請求,則放置消息。

如果客戶端可能不會錯過消息,並且一直很慢,那麼問題就更加困難。一種選擇可能是將消息轉移到具有特定標頭值的JMS隊列,然後打開一個新的使用者,使用JMS選擇器從該隊列中讀取消息。這會給JMS服務器帶來更多負擔,但可能適合臨時緩慢,並希望它不會干擾您的主要主題提要。或者,您可能希望將郵件存儲在不同的存儲區中,例如數據庫,以便可以在發送郵件時輪詢郵件。如果你這樣做,一個單一的輪詢線程可以處理很多客戶端(查詢具有突出消息的客戶端,然後爲每個客戶端加載一堆消息)。然而這不像使用JMS那樣方便。

我不會選擇2去,因爲阻塞隊列只會暫時解決問題,你可以通過跟蹤有多少寫操作等待完成實現同樣的事情。

+0

謝謝你對這個問題的想法,我很欣賞它,你給了我很多想法。 – TinusSky