2014-07-25 126 views
0

我的要求是讓訂閱者根據Web服務是否啓動來暫停處理消息。所以,當Web服務關閉時,消息應該繼續從發佈服務器發送到訂閱服務器隊列,並繼續堆疊,直到Web服務再次啓動。 (這些消息不應該進入錯誤隊列,而是留在訂閱者隊列中。)在發佈/訂閱模型中,如何根據某些外部狀態使訂閱者暫停處理?

我試圖使用取消訂閱,但發佈者停止發送消息,因爲取消訂閱似乎清除了RavenDB上的訂閱信息。我也嘗試在Transport類上設置MaxConcurrencyLevel,如果我將工作線程設置爲0,則發送到Subscriber的消息直接進入錯誤隊列。最後,我嘗試了Defer,它似乎將當前消息放入審計隊列中,並在超時完成時創建消息的克隆並將其本地發送到訂閱者隊列。另外,由於我必須不斷檢查服務狀態並保持推遲,所以我無法控制消息的順序,因爲我無法預測Web服務何時啓動。

達到我所解釋的行爲的最佳方式是什麼?我正在使用NServiceBus版本4.5。

+0

用戶本身是否能夠檢查服務是否在線? –

+0

是的,用戶可以檢查服務是否啓動。 –

+0

如果服務關閉,您可以使用saga和超時。一旦處理程序與Web服務交互完成並回覆成功消息,即可完成傳奇。合理? –

回答

0

這聽起來像你想不斷嘗試處理一條消息,直到它成功,而不是將其洗回隊列中(保持在頂部並繼續嘗試)?

我認爲你唯一的純NSB選項是用來控制第一級重試的MaxRetries設置:http://docs.particular.net/nservicebus/msmqtransportconfig。將MaxRetries設置爲很高的數字可能會做你正在尋找的東西,但我無法想象這樣做會是一個好的做法。

二級重試會將消息推遲一段可配置的時間,但IIRC將允許其他消息從主隊列中處理。

我認爲你最好的選擇是把重試邏輯放到你自己的代碼中。因此,處理程序可以嘗試訪問在循環中(也許在延遲)的時間服務x次,它拋出一個異常,NSB的重試功能,在踢之前

編輯:

你的要求似乎是這樣的:

「當MyEvent進來時,我需要做一個web服務調用,如果web服務關閉了,我需要繼續以Y的間隔嘗試X次,此時我會認爲它是一個失敗並處理失敗情況,直到我成功或失敗,我將阻止其他消息被處理。「

您在處理消息(重試,超時,錯誤條件,阻塞其他消息等)方面有一些潛在的複雜邏輯。請記住NSB打算在系統中扮演的角色:通過消息傳遞服務之間的通信。雖然NSB確實具有一些允許消息編排(如sagas)的高級功能,但它並非真正用於替換域或應用程序邏輯。

底線,您可能需要編寫自定義代碼來處理您的特定情況。一個天真的解決方案將是一個循環,在處理程序中有一段延遲,但您可能需要創建一個更強大的內存中收集/隊列,以便在服務停止時保留消息,並在其恢復時連續處理它們。

+0

感謝您的回覆。我不想因爲你提到的同樣的原因而添加重試。有一件事似乎有效,就是讓當前的線程睡眠,我不認爲這是一個好的做法。 –

+0

更新了我的答案。 –

0

達到一定程度所需的行爲最簡單的方法是:

定義它檢查服務是否可用,如果沒有消息處理函數調用HandleCurrentMessageLater和執行實際的消息處理消息處理程序。然後指定消息處理程序的順序,以便首先執行檢查服務可用性的處理程序。

public interface ISomeCommand {} 

public class ServiceAvailabilityChecker : IHandleMessages<ISomeCommand>{ 
    public IBus Bus { get; set; }  
    public void Handle(ISomeCommand message) { 
     try { 
     // check service 
     } 
     catch(SpecificException ex) { 
     this.Bus.HandleCurrentMessageLater(); 
     } 
    } 
}  
public class ActualHandler : IHandleMessages<ISomeCommand>{ 
    public void Handle(ISomeCommand message) { 
    } 
}  
public class SomeCommandHandlerOrdering : ISpecifyMessageHandlerOrdering{ 
    public void SpecifyOrder(Order order){ 
      order.Specify(First<ServiceAvailabilityChecker>.Then<ActualHandler>()); 
    } 
} 

有了這設計您將獲得以下內容:

  • 您可以檢查可用性實際業務代碼調用
  • 之前如果服務不可用的消息放回到隊列
  • 如果服務可用並且您的業務代碼被調用,但是在調用ActualHandler之前服務變得不可用,那麼您將獲得第一級和第二級重試(以及管道中的可用性檢查)