2014-10-18 67 views
0

我有一個場景,我得到作爲輸入消息A.消息A必須被拆分成3種不同類型的消息,並轉發到其他路由。消息以精確的順​​序到達是重要的,也就是說。 A-1必須在A-2之前發送,A-2必須在A-3之前發送。駱駝多播子網路故障

爲此,我做了以下(輪廓):

from("activemq:queue:somequeue-local") 
    .multicast().to("direct:a1","direct:a2","direct:a3"); 

from("direct:a1) 
    //split incoming message and prepare output document for A-1 
    .to("activemq:queue:otherqueue") 

.from("direct:a2) 
    //split incoming message and prepare output document for A-2 
    .to("activemq:queue:otherqueue") 

.from("direct:a3) 
    //split incoming message and prepare output document for A-3 
    .to("activemq:queue:otherqueue") 

而在另一種情況下,負責發送出來的信息到外部系統,我有

.from("activemq:queue:otherqueue?maxMessagesPerTask=1&concurrentConsumers=1&maxConcurrentConsumers=1") 
     // do different stuff based on which type we are called with then end with 
    .beanref("somebean","writeToFileAndCallImportbat"); 

現在,我的問題是,當我到達接收器時,我按隨機順序收到消息。有時A-1,A-3,A-2,有時是正確的,A-1,A-2,A-3。

我曾嘗試將JMSXGroupID和JMSXGroupSeq添加到消息中,但沒有任何運氣。

我也嘗試完全跳過MQ部分,並使用direct-vm:來調用共享接收器,但是它看起來像我有三個同時接收器的同時調用,並且仍然以隨機執行順序。

我的印象是組播將按順序運行,除非另有提示?

採取的方法是否存在根本性錯誤?

我正在使用駱駝版本2.12。

或者,更明白地說:

  • 我想創建三個不同的輸出消息的路由,並執行他們的批處理文件,爲了。我該如何解決這個問題?

回答

0

如果使用Splitter pattern,你檢查,看看是否屬性設置爲false。

如果啓用,則駱駝將以流方式拆分,這意味着它將以塊的形式拆分輸入消息。這減少了內存開銷。例如,如果您將大消息分開,建議啓用流式傳輸。如果啓用流式傳輸,那麼子消息回覆將按亂序進行彙總,例如按照它們返回的順序進行彙總。如果禁用,駱駝將按照與拆分相同的順序處理子消息回覆。

+0

你是完全正確的。問題在於我如何處理分裂問題,雖然不完全是出於您陳述的原因。但考慮到我的例子沒有顯示細節,沒有辦法看到。 – Soraz 2014-10-18 16:36:55

0

所以,事實證明,多播並不是一個問題。

相反,我的每個子路由的,我這樣做:

.split(..stax(SpecialClass)).streaming() 
.beanRef("transformationBean","somefunction") 
.aggregate(constant("1"), new MyAggregator()) 
.completionTimeout(5000) 
.completionSize(1000) 
.to(writeToFileAndRunBat) 

我假定這,意味着「過程中的斷續的所有元素,如果你沒有在5秒後結束1000個元素,爆發「。

我改成了

.split(..stax(SpecialClass), , new MyAggregator()).streaming() 
.beanRef("transformationBean","somefunction") 
.end() 
.to(writeToFileAndRunBat) 

來到想起來了,它非常有意義,作爲第一個版本無法真正知道什麼時候我們做了,而最後(我認爲)只是遍歷拆分中的所有元素併爲每個元素調用聚合器。

此外,我不得不在第一個版本中.end()。所以我想整個事情都是隨機的。