2012-12-10 22 views
7

我想創建一個使用MSMQ綁定的WCF服務,因爲我有大量的服務要處理的通知。重要的是,客戶不會受到服務的阻礙,並且通知按照他們提出的順序進行處理,從而實現隊列。如何使用多個WCF服務實例執行消息隊列順序

另一個考慮因素是韌性。我知道我可以羣集MSMQ本身來使隊列更健壯,但我希望能夠在不同的服務器上運行我的服務實例,所以如果服務器崩潰通知不會在隊列中建立起來,而是另一臺服務器繼續處理。

我已經試驗了MSMQ綁定,發現您可以讓一個服務的多個實例在同一個隊列上監聽,並留給自己,最終他們做了一種循環法,負載遍佈可用服務。這很好,但最終我失去了隊列的排序,因爲不同的實例需要不同的時間來處理請求。

我一直在使用一個簡單的控制檯應用程序進行實驗,這是下面的史詩般的代碼轉儲。當它運行我得到的輸出是這樣的:

host1 open 
host2 open 
S1: 01 
S1: 03 
S1: 05 
S2: 02 
S1: 06 
S1: 08 
S1: 09 
S2: 04 
S1: 10 
host1 closed 
S2: 07 
host2 closed 

我希望發生的是:

host1 open 
host2 open 
S1: 01 
<pause while S2 completes> 
S2: 02 
S1: 03 
<pause while S2 completes> 
S2: 04 
S1: 05 
S1: 06 
etc. 

我本來以爲是S2還沒有完成,它可能仍然失敗,並返回消息它正在處理隊列。因此,S1不應該被允許從隊列中拉出另一條消息。我排隊我們交易,我已經嘗試設置TransactionScopeRequired = true服務,但無濟於事。

這甚至可能嗎?我是否以錯誤的方式去做?如果沒有某種中央同步機制,是否有其他方法可以構建故障轉移服務?

class WcfMsmqProgram 
{ 
    private const string QueueName = "testq1"; 

    static void Main() 
    { 
     // Create a transactional queue 
     string qPath = ".\\private$\\" + QueueName; 
     if (!MessageQueue.Exists(qPath)) 
      MessageQueue.Create(qPath, true); 
     else 
      new MessageQueue(qPath).Purge(); 

     // S1 processes as fast as it can 
     IService s1 = new ServiceImpl("S1"); 
     // S2 is slow 
     IService s2 = new ServiceImpl("S2", 2000); 

     // MSMQ binding 
     NetMsmqBinding binding = new NetMsmqBinding(NetMsmqSecurityMode.None); 

     // Host S1 
     ServiceHost host1 = new ServiceHost(s1, new Uri("net.msmq://localhost/private")); 
     ConfigureService(host1, binding); 
     host1.Open(); 
     Console.WriteLine("host1 open"); 

     // Host S2 
     ServiceHost host2 = new ServiceHost(s2, new Uri("net.msmq://localhost/private")); 
     ConfigureService(host2, binding); 
     host2.Open(); 
     Console.WriteLine("host2 open"); 

     // Create a client 
     ChannelFactory<IService> factory = new ChannelFactory<IService>(binding, new EndpointAddress("net.msmq://localhost/private/" + QueueName)); 
     IService client = factory.CreateChannel(); 

     // Periodically call the service with a new number 
     int counter = 1; 
     using (Timer t = new Timer(o => client.EchoNumber(counter++), null, 0, 500)) 
     { 
      // Enter to stop 
      Console.ReadLine(); 
     } 

     host1.Close(); 
     Console.WriteLine("host1 closed"); 
     host2.Close(); 
     Console.WriteLine("host2 closed"); 

     // Wait for exit 
     Console.ReadLine(); 
    } 

    static void ConfigureService(ServiceHost host, NetMsmqBinding binding) 
    { 
     var endpoint = host.AddServiceEndpoint(typeof(IService), binding, QueueName); 
    } 

    [ServiceContract] 
    interface IService 
    { 
     [OperationContract(IsOneWay = true)] 
     void EchoNumber(int number); 
    } 

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] 
    class ServiceImpl : IService 
    { 
     public ServiceImpl(string name, int sleep = 0) 
     { 
      this.name = name; 
      this.sleep = sleep; 
     } 

     private string name; 
     private int sleep; 

     public void EchoNumber(int number) 
     { 
      Thread.Sleep(this.sleep); 
      Console.WriteLine("{0}: {1:00}", this.name, number); 
     } 
    } 
} 
+0

你得很難與WCF的MSMQ做到這一點結合(見http://stackoverflow.com/questions/729612/ordered-delivery-with-netmsmqbinding)。但是,您可以使用事務性MSMQ和一些對帳代碼(不含WCF)。 –

回答

10

batwad,

您正嘗試手動創建服務總線。你爲什麼不嘗試使用現有的?

NServiceBus,MassTransit,ServiceStack

至少與那些MSMQ工作2。

此外,如果您絕對需要訂購,可能實際上是由於其他原因 - 您希望能夠發送消息,並且不希望在第一條消息之前處理依賴消息。你正在尋找傳奇模式。 NServiceBus和MassTransit都將允許您輕鬆管理Sagas,他們都可以讓您簡單地觸發初始消息,然後根據條件觸發剩餘的消息。它將允許您快速實現分佈式應用程序的豐富功能。

然後,您甚至可以擴展到數千個客戶端,隊列服務器和消息處理器,而無需編寫一行代碼,也沒有任何問題。

我們試圖通過msmq在這裏實現我們自己的服務總線,我們放棄了,因爲另一個問題一直在蔓延。我們使用NServiceBus,但MassTransit也是一個很好的產品(它是100%開源的,NServiceBus不是)。 ServiceStack在製作API和使用消息隊列方面非常出色 - 我相信您可以使用它在幾分鐘內使服務充當Queue前端。

哦,我是否提到在NSB和MT的情況下,只需要10行代碼即可完全實現隊列,發送方和處理程序?

----- -----的新增

烏迪大寒(NServiceBus的主要貢獻者之一)在談到這一點: "In-Order Messaging a Myth" by Udi Dahan "Message Ordering: Is it Cost Effective?" with Udi Dahan

克里斯·帕特森(的主要原因之一地下的貢獻者) "Using Sagas to ensure proper sequential message order" question

StackOverflow的問題/回答: "Preserve message order when consuming MSMQ messages in a WCF application"

-----問題-----

我必須說,我很困惑,爲什麼你需要保證消息順序 - 如果你使用的是HTTP/SOAP,你會處於相同的位置協議?我的猜測是否定的,那爲什麼它在MSMQ中是個問題?

祝你好運,希望這有助於

+0

我正在提出相互依賴的通知。例如,一列火車在到達那裏之前不能離開火車站。我想確保在OnArrive之後處理OnDepart通知,否則OnDepart處理可能不起作用(例如,:如果它還沒有處理相應的OnArrive,那麼火車在火車站停留的時間還不能計算出來) – batwad

+0

在OnDepart之前,您絕對不應該上OnArrive,這也意味着您不應該發送它。看看佐賀模式,這正是你的意思 - 你需要管理MSMQ以外的狀態。 –

+0

好吧,我想我終於明白了......您正在發送一個OnDepart事件,但是您擔心OnArrive事件稍後在OnDepart事件觸發時可能還沒有處理完......是的,我看到了這個問題 - 但不幸的是,我不能真正給你一個黃金解決方案,除了可能:使用某種唯一的標識符,如果你不能保證MSMQ發送消息在另一個之前(火車起源網絡崩潰,因此OnDepart卡在那裏,和最終火車到達),那麼你必須圍繞它編碼。如果你得到一個OnArrive事件,並且OnDepart –

1

確保消息的按順序傳遞是高卷消息事實上的棘手問題之一。

在理想的世界中,您的消息目標應該能夠處理亂序消息。這可以通過確保您的消息源包含某種排序信息來實現。理想情況下,這又採用某種x-n批量印章的形式(第10條消息中的第1條,第10條中的第2條等)。一旦交付數據,您的消息目的地就需要將數據彙編成訂單。

但是,在現實世界中,通常沒有改變下游系統來處理不按順序發送的消息的空間。在這種情況下,你有兩個選擇:

  1. 轉到完全單線程的 - 實際上,你通常可以找到某種「分組標識」,這意味着你可以去單線程中的for-each-羣感測,這意味着你的在不同的消息組中仍然具有併發性。
  2. 在您希望收到有序消息的每個消費者系統周圍實施一個re-sequencer包裝。

這兩種解決方案都不是很好,但這是我認爲可以擁有併發性和按順序傳遞消息的唯一方法。

+0

有趣,但是如果一個ServerA從隊列中取消了10個消息中的一個,並且ServerB將隊列中的2個消息取出10個,那麼兩個都不會獲得完整的批次以重新排序。 – batwad

+0

所以在這個配置中你需要每個組一個隊列。這不是很實際。 –