2010-11-04 71 views
0

我有消息表示發送到隊列的兩個用戶之間的操作。 用戶作爲屬性存儲在消息中。 當關於一個或另一個用戶的不同操作正在同時處理時,要求不允許處理[從隊列中取出]的操作。如何實現JMS監聽器忽略基於List/Map的特定消息?

由於我使用Grails(這意味着春季)和ActiveMQ的,我想過實現這種方式:

創建交易MessageListenerContainer的,當監聽器讀取消息時,它會得到事務的狀態,檢查無論用戶A還是用戶B當前都有處理動作並取決於結果 - 處理動作或回滾jms事務。 我想到了創建一個簡單的圖來存儲當前處理的用戶的詳細信息:

{ 
    userId : [listenerId, timestamp] 
} 

我想聽衆存儲ID和時間戳,以便有緩解的錯誤處理方式[沒有到達那裏還沒有。

所以在處理操作之前,userId會放在那裏,操作完成之後鍵將被刪除。

不幸的是,我在原型設計時遇到了一個主要問題。當我回滾消息時,監聽器將等待並再次讀取相同的消息。 這意味着,如果我爲userA獲得5個操作並且只有5個偵聽器,則在處理第一個操作時整個過程將會卡住。這不是我想要的。 我希望聽者閱讀一條消息,檢查是否可以處理動作,以及是否讀取下一條消息。 它甚至有可能實現?

在我的原型有一個聽衆,當我發送5條信息,並回滾他們,這是我得到[這是從讀者:時間戳時讀取消息+味精號]:

1288892171570 0 
1288892171578 0 
1288892176582 0 
1288892181586 0 
1288892186589 0 
1288892191594 0 
1288892196596 0 
1288892201601 1 
1288892206604 1 
1288892211607 1 
1288892216612 1 
1288892221614 1 
1288892226618 1 
1288892231621 1 
1288892236625 2 
1288892241629 2 
1288892246632 2 
1288892251636 2 
1288892256641 2 
1288892261645 2 
1288892266647 2 
1288892271652 3 
1288892276656 3 
1288892281659 3 
1288892286663 3 
1288892291667 3 
1288892296671 3 
1288892301674 3 
1288892306679 4 
1288892311682 4 
1288892316686 4 
1288892321689 4 
1288892326693 4 
1288892331696 4 
1288892336700 4 

重新傳遞政策是AMQ 5.4.1的默認值。 我想看到的是: 0,1,2,3,4,0,1,2,3,4,0,1,2,3,4 ...

我以爲應該以某種方式工作,即當事務回滾並且消息被標記爲重新傳遞時,它將被延遲,但是監聽器將繼續工作。此外,當消息被重新傳遞時,它將回到它在隊列中的位置 - 消息時間戳排序的隊列[應該是?]。

實施例[假設在同一時間發送的所有5個消息]: 消息0被讀出,則進行檢查,可以加工 - >處理 MSG 1被讀出,則進行檢查,不能被處理,事務已返回,延遲X時間 讀取消息2,執行檢查,無法處理,事務回滾,延遲X時間 讀取消息3,執行檢查,無法處理,事務回滾,延遲了X的時間量 消息1重新發送,因爲X時間已過,執行檢查,無法處理,事務回滾,延遲X時間量 消息4被讀取.....

我不知道我嘗試做什麼是明智的,還是應該考慮以其他方式做?我認爲這種方式很簡單,容易擴展。 也許這只是AMQ配置的問題,但我確實看過,沒有在重新發送設置中找到任何有用的信息。

感謝, 克里斯蒂安

回答

0

看來,這是主動MQ的「功能」:在寫作的等待審查的時間https://issues.apache.org/activemq/browse/AMQ-1853 問題,但是有一個鏈接指向這可能與駱駝實施的解決方案。 我認爲這是我將要走的路[至少現在]。 我將創建兩個隊列: actionsQueue busyQueue

監聽器會從actionsQueue行動,將測試一項行動是否能夠被處理。 如果不是,它會將消息發送到busyQueue。 我將在busyQueue上監聽駱駝,並自動將消息路由到具有配置延遲的actionsQueue。

看起來像這樣是前進的方式,如果我想使用原來的想法來實現它。

我仍然想知道是否有更好的方法來做到這一點。