2012-03-20 55 views
0

我有一個NServiceBus的用例,在this問題中有解釋。本質上有一個用於ImportProductCommand消息的消息處理程序。通常,在另一個處理程序接收到指定要導入的一組產品的ImportProductsCommand後,將這些消息的集合發送到處理程序。需要知道集合導入的狀態,因此使用傳奇來管理導入狀態。 ImportProductCommand的處理程序在完成時發佈ProductImportedEvent消息和ProductImportFailedEvent消息。傳奇訂閱了這些消息,它們使用分配給消息的初始接收時的ID將它們關聯到發起的ImportProductsCommand消息。用NServiceBus發佈到本地用戶

public class ProductImportSaga : NServiceBus.Saga.Saga<ProductImportSagaData>, 
    IAmStartedByMessages<ImportProductsCommand>, 
    IHandleMessages<IProductImportedEvent>, 
    IHandleMessages<IProductImportFailedEvent> 
{ 
    public override void ConfigureHowToFindSaga() 
    { 
     ConfigureMapping<IProductImportedEvent>(x => x.ImportId, x => x.CorrelationId); 
     ConfigureMapping<IProductImportFailedEvent>(x => x.ImportId, x => x.CorrelationId); 
    } 

    public void Handle(ImportProductsCommand message) 
    { 
     this.Data.ImportId = message.Id; 
     this.Data.Total = message.SKUs.Length; 
     foreach (var command in message.SKUs.Select(sku => new ImportProductCommand(message.SupplierId, sku, message.ImportImage, message.Id))) 
      this.Bus.SendLocal(command); // send to local handler shown below 
    } 

    public void Handle(IProductImportedEvent message) 
    { 
     this.Data.OnCompleted(); 
    } 

    public void Handle(IProductImportFailedEvent message) 
    { 
     this.Data.OnFailed(); 
    } 
} 

和個人ImportProductCommand消息的處理程序是這樣的:

// handles messages sent by saga above (or sent individually by WCF service) 
public class ImportProductHandler : IHandleMessages<ImportProductCommand> 
{ 
    public IBus Bus { get; set; } 

    public void Handle(ImportProductCommand message) 
    { 
     // importing logic here and upon success: 
     if (success) 
     { 
      this.Bus.Publish(new ProductImportedEvent(correlationId: message.Id)); 
     } 
     else 
     { 
      this.Bus.Publish(new ProductImportFailedEvent(correlationId: message.Id)); 
     } 
    } 
} 

這個問題是,當事件消息發佈他們得到放置在與託管兩個過程相關聯的隊列個人處理程序和傳奇。發生這種情況時,隊列中可能會有很多消息,這些消息最初是由傳奇爲響應ImportProductsMessage而發送的。這意味着直到隊列中的ImportProductCommand消息被處理之後,傳奇才會收到這些事件,因此批量導入的處理狀態將不會及時更新。如果我將傳奇寄託在另一個進程中,那麼它將接收到消息,而不必等待命令隊列處理。在同一過程中託管處理程序和傳奇時,是否有辦法實現相同的效果?基本上,我希望事件消息按照與ImportProductCommand消息不同的順序進行處理,即使它們位於同一隊列中,以便傳奇可以處理這些事件並相應地更新其狀態。這是可能的還是有更好的方法來實現這個結果?我試圖使用First<T>來指定消息處理程序的順序,但沒有用,而且爲了密切相關的邏輯而部署兩個不同的主機似乎過分了。

回答

2

NSB沒有優先級的概念,所以通常使用另一個端點來執行工作。這聽起來像你的工作分配後,你可能想看看Distributor。在該模型中,佐賀將保持整個工作單元的狀態,而每個端點將處理實際的處理。如果事情開始變慢,這將允許您動態添加其他端點。

如果您不想實施完整的分銷商,那麼至少將實際工作推向另一個終端將會緩解任何優先需求。

+0

我認爲與一個單獨的端點一起走可能是一條路,因爲還有另一個需求來支持手動請求的個別導入優先於批量導入命令排隊的各個導入之上。 – eulerfx 2012-03-20 18:34:41

+0

然後,您可以指定其中一個節點來僅處理手動導入,並通過某個用戶界面與其交互。 – 2012-03-20 21:49:53