2011-11-30 96 views
1

我正在實現一個組件,該組件可以在特定隊列可用時讀取所有消息,但應該只能從隊列中異步刪除消息,消息內容已被處理並保留。我們讀取消息的速度比我們承認的要快(例如,我們可以閱讀10條消息,然後我們已準備好確認第一條消息)。當前的實現使用XMS API,但如果XMS不適合這些用途,我們可以切換到MQI。.net XMS中的IBM-MQ讀取器逐個確認處理後的消息

我們嘗試了兩種方法來嘗試解決這個問題,但都具有使它們無法接受的缺點。我希望有人能提出更好的方法。

第一個實現在專用線程中使用IMessageConsumer來讀取所有消息並在其可用時發佈其內容。消息處理完畢後,調用message.Acknowledge()方法。會話由AcknowledgeMode.ClientAcknowledge創建。這種方法的問題在於,根據文檔,這會確認(並刪除)已收到的所有未確認消息。以上面的例子來說,這意味着所有10個讀取的消息都會在第一次調用時收到。所以這並不能真正解決問題。因爲讀的吞吐量,我們需要的,我們確實不能修改這個解決方案,以等待第一個消息的ACK讀取第二前等

第二個實施已決定的線程使用的IQueueBrowser閱讀所有郵件和發佈他們的內容。這不會在隊列讀取時從隊列中刪除消息。一個單獨的專用線程然後等待(在BlockingQueue上)已處理的消息的JMS消息ID。對於其中的每一個,它然後構造一個專用的IMessageConsumer(使用帶有JMSMessageID的消息選擇器)來讀取消息並對其進行確認。 (IQueueBrowser與專用IMessageConsumer的配對由XMS文檔關於隊列瀏覽器的部分推薦。)此方法的工作方式與預期相同,但正如人們所想象的那樣,它在MQ服務器上佔用CPU太多。

+0

是的,這就是行爲,消息無法確認隨機的。一個消息確認將消除所有先前的消息。 MQ .NET中也沒有選項。從你的描述看來,你可以用一個確認電話確認所有消息,但你更關心吞吐量。你期望的吞吐量是多少? – Shashi

+0

預期吞吐量:最活躍的隊列上每天最多100萬條消息,MQ服務器上每個隊列每天最多1000萬條消息的組合。 – Josh

+0

只是爲了澄清:我們不能真正讀取和處理每封郵件順序(讀斷下之前)的主要原因是,實際的處理具有相當顯著交貨時間 - 這就是爲什麼我們需要同時處理許多消息。 – Josh

回答

3

問題中提出的兩種方法似乎都依賴於應用程序的單個實例。使用多個應用程序實例,事務會話和COMMIT有什麼不對?性能報告(這些是SupportPacs,名稱類似MP **)都顯示吞吐量通過多個應用程序實例最大化,而水平縮放是您場景中最常用的方法之一。

這樣的設計可能是多個應用程序實例或同一個應用程序中的多個線程。讓它正常工作的關鍵是記住事務的範圍是連接句柄。這意味着多線程應用程序必須爲每個連接實例分配一個單獨的線程,並且在同一線程中讀取消息。

處理流程是,使用事務處理會話,應用程序針對隊列執行正常的MQGet,根據需要處理消息內容,然後發出MQCommit。 (我將在我的示例中使用MQ本機API名稱,因爲這不取決於語言)。如果這是XA事務,則應用程序將調用MQBegin來啓動事務,但對於單階段提交,假定事務。在這兩種情況下,MQCommit都會結束從隊列中刪除消息的事務。當消息處於同步點時,沒有其他應用程序實例可以檢索它們; MQ只提供下一個可用消息。如果事務回滾,則假定FIFO交付,任何線程的下一個MQGet都會檢索它。

有一些樣品中:
[WMQ install home]\tools\dotnet\samples\cs\xms\simple\wmq\
...而SimpleXAConsumer.cs是一個顯示此XA版本的示例。非XA版本更簡單,因爲您不需要外部協調器,動詞等。如果您從其中一個開始,請確保它們不指定排隊使用隊列,並且可以使用相同的配置啓動多個實例。或者,也可以採用包含創建連接,消息處理,連接關閉和銷燬的樣本部分,並將所有這些部分包含在線程spawner類中。

[插入瞭解使用最新版本的類的常用建議。]

+0

現在要試試這個 - 將讓你知道它是如何工作了... – Josh

+0

所以,我曾經試圖這一點,並描述它的工作。但我有一個跟進的問題:我想有這個設置沒有簡單的方法來跳過一個特定的消息(在我的應用程序是無法處理的消息的情況下),並把它留在隊列中的其他應用程序或未來運行? – Josh

+1

接受的模式是使用返工隊列或例外隊列。 IBM JMS類將查找隊列屬性「BOQNAME」和「BOQTHRESH」。一旦消息的回退計數超過「BOQTHRESH」,消息將自動重新發送到「BOQNAME」隊列。這不需要額外的程序邏輯,只需定義隊列,在異常隊列中設置「BOQTHRESH」並指向「BOQNAME」。有些應用程序遵循類似的模式,但添加自己的邏輯來處理這個更便攜的方法的requeue。然後根據需要僅處理異常隊列。 –

相關問題