2013-04-16 69 views
1

處理程序接收到的每條消息都包含聚合根的整個狀態。系統然後能夠根據這些數據執行所需的操作。在我的場景中,根據消息中的數據授予訪問權限,例如進入房間A & B.消息包含整套授權訪問。 這些消息可能無序到達,因爲諸如MSMQ之類的消息系統不保證有序交付。如何使用Rebus過濾消息?

消息#1允許訪問房間A & B,但消息#2僅授予對房間A的訪問權限。如果他們到達亂序,則將訪問權限授予A房間,然後再訪問到房間A & B.這不是理想的結果。只有進入房間A應該被授予。每個消息都包含一個時間戳,它在發佈時設置。我想使用此時間戳來刪除無序到達的消息,例如舊的。如果消息#2在消息#1之前到達,則消息1#應該被丟棄。

我可以在每個處理程序方法中實現這個過濾器,但這將是單調乏味的,所以我希望Rebus有一些沿着EAI Message Filters的行?

我打開其他選項/實現?

回答

1

我不確定我明白你爲什麼會傳遞消息中的整個聚合根,但除此之外,Rebus還有一個簡單的方法可以讓你對消息進行排序。

我建議你讓所有的消息實現一個通用的接口來捕獲這個特定的方面,像

public interface IHaveSequenceNumber 
{ 
    int SequenceNumber { get; } 
} 

然後,創建遇到舊消息時處理IHaveSequenceNumber並中止處理流水線一條簡單的信息處理程序,例如

public class WillDiscardOldMessages : IHandleMessages<IHaveSequenceNumber> 
{ 
    public void Handle(IHaveSequenceNumber messageWithSequenceNumber) 
    { 
     if (IsTooOld(messageWithSequenceNumber)) 
     { 
      MessageContext.GetCurrent().Abort(); //< make this the last handler 
     } 
    } 
} 

然後 - 非常重要的:) - 確保您的消息過濾器永遠是第一位在處理器的流水線時,調度開始:

Configure.With(...) 
    .Transport(...) 
    .SpecifyOrderOfHandlers(s => s.First<WillDiscardOldMessages>()) 
    .(...) 

IsTooOld()方法的上面,我會離開的實施對你來說 - 如果你的端點只有一個單獨的工作線程,它可能會很簡單,而併發處理這些東西並不是微不足道的。

這有道理嗎?

+0

我將聚合根存儲在消息中而不是標識符,因此處理程序不必檢索中央存儲庫中的數據。這是很常見的做法,不是嗎? – Lybecker

+0

我想我明白了。我應該有多個處理程序,但要確保第一個丟棄_old_消息,例如過濾器。當我有同一個消息的多個消費者(訂閱相同消息類型的多個處理程序)時,這會工作嗎?信息是否會同時傳遞給每個處理程序? – Lybecker

+1

你是什麼意思「將信息同時傳遞給每個處理程序」? Rebus將得到傳入消息的一系列處理程序,根據你的規範來排序(即'.First ()'),然後按順序遍歷該消息並將消息分發給每個...處理單個消息時沒有並行性 - 並行性是通過讓多個工作人員在給定的時間點處理單個消息來實現的。 – mookid8000

相關問題