2013-11-14 99 views
0

喜aggrate一個消息分成多個組,我們正試圖流處理金融市場數據通過利用Apache的駱駝或彈簧集成計算交易信號。 我們的用例之一是聚集連續的價格一起基於價格時間戳如下:如何通過Spring集成聚合

  • 輸入

輸入消息當屬(時間戳,價格)對時間序列。假設進來作爲值,每對(TX,PX)是用於時間標記和P爲價格值

(T0,P1),(T1,P1),(T2,P2),(T3,P3),(T4,P4)... 
  • 聚合

假設我們需要聚合而T A的消息每3個連續的消息一起進行進一步計算,給定輸入消息我們需要產生下列組,每3對組是一個彙總消息:

[(T0,P1),(T1,P1),(T2,P2)], 
[(T1,P1),(T2,P2),(T3,P3)], 
[(T2,P2),(T3,P3),(T4,P4)], 
.... 

正如您所看到的,大部分消息將被聚合到多個組。有人可以建議是否有辦法通過使用當前聚合器而不是寫一個。

看來,Spring集成總分組是根據相關的關鍵一樣,所以這些消息將需要映射到一組相關的密鑰。但是,目前的api似乎只允許我們生成一個關聯密鑰,這意味着每條消息只能聚合到一個組中。有沒有解決這個問題。

P.S.

閱讀駱駝的源代碼後,駱駝似乎不能支持我們的要求。春天試試我的運氣吧。 手指交叉camel question

+0

對不起 - 我錯讀了你的問題。 –

+0

我用一個可能的解決方案解決了我的問題。 –

回答

1

我們沒有什麼現成的,但我可以做你想做的一個小修正SimpleMessageStore。我已發佈全部RollingMessageStorein a gist

底線是修改removeGroup僅刪除第一消息,而不是整個組。此外,使completeGroup不適用。

設置expreGroupOnCompletion迫使聚合器「去除」的組(通過調用修改removeGroup()方法。

這裏是SimpleMessageGroupRollingMessageGroup之間的差異...

182,184c190,194 
< 
<    groupUpperBound.release(groupIdToMessageGroup.get(groupId).size()); 
<    groupIdToMessageGroup.remove(groupId); 
--- 
>    Message<?> message = this.groupIdToMessageGroup.get(groupId).getOne(); 
>    if (message != null) { 
>     this.groupUpperBound.release(1); 
>     this.removeMessageFromGroup(groupId, message); 
>    } 

(加刪除所有代碼中completeGroup()

和測試案例...

@Test 
public void testRolling() { 
    AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new MultiplyingProcessor(), new RollingMessageStore()); 
    aggregator.setExpireGroupsUponCompletion(true); 
    aggregator.setReleaseStrategy(new ReleaseStrategy() { 

     @Override 
     public boolean canRelease(MessageGroup group) { 
      return group.size() == 3; 
     } 
    }); 
    QueueChannel replyChannel = new QueueChannel(); 
    Message<?> message1 = createMessage(3, "ABC", 3, 1, replyChannel, null); 
    Message<?> message2 = createMessage(5, "ABC", 3, 2, replyChannel, null); 
    Message<?> message3 = createMessage(7, "ABC", 3, 3, replyChannel, null); 
    Message<?> message4 = createMessage(9, "ABC", 3, 3, replyChannel, null); 
    Message<?> message5 = createMessage(11, "ABC", 3, 3, replyChannel, null); 

    aggregator.handleMessage(message1); 
    aggregator.handleMessage(message2); 
    aggregator.handleMessage(message3); 
    aggregator.handleMessage(message4); 
    aggregator.handleMessage(message5); 

    Message<?> reply = replyChannel.receive(10000); 
    assertNotNull(reply); 
    assertEquals(reply.getPayload(), 105); 
    reply = replyChannel.receive(10000); 
    assertNotNull(reply); 
    assertEquals(reply.getPayload(), 315); 
    reply = replyChannel.receive(10000); 
    assertNotNull(reply); 
    assertEquals(reply.getPayload(), 693); 
} 

請着手打開JIRA New Feature Issue,我們將看看添加這個(或更一般的解決方案)即將到來的3.0版本。

使用correlation-strategy-expression="'foo'"

release-strategy-expression=size()==3

+1

非常感謝您的回覆,我創建了一個jira項目來跟蹤此https://jira.springsource.org/browse/INT-3206。 在jira我請求創建一個更通用的解決方案,它允許我們創建返回一個關聯列表中的關鍵字策略,它可以適用於更多的情況,說每個消息可以聚合到不同的組 – user2956246