我想創建一個使用MSMQ綁定的WCF服務,因爲我有大量的服務要處理的通知。重要的是,客戶不會受到服務的阻礙,並且通知按照他們提出的順序進行處理,從而實現隊列。如何使用多個WCF服務實例執行消息隊列順序
另一個考慮因素是韌性。我知道我可以羣集MSMQ本身來使隊列更健壯,但我希望能夠在不同的服務器上運行我的服務實例,所以如果服務器崩潰通知不會在隊列中建立起來,而是另一臺服務器繼續處理。
我已經試驗了MSMQ綁定,發現您可以讓一個服務的多個實例在同一個隊列上監聽,並留給自己,最終他們做了一種循環法,負載遍佈可用服務。這很好,但最終我失去了隊列的排序,因爲不同的實例需要不同的時間來處理請求。
我一直在使用一個簡單的控制檯應用程序進行實驗,這是下面的史詩般的代碼轉儲。當它運行我得到的輸出是這樣的:
host1 open
host2 open
S1: 01
S1: 03
S1: 05
S2: 02
S1: 06
S1: 08
S1: 09
S2: 04
S1: 10
host1 closed
S2: 07
host2 closed
我希望發生的是:
host1 open
host2 open
S1: 01
<pause while S2 completes>
S2: 02
S1: 03
<pause while S2 completes>
S2: 04
S1: 05
S1: 06
etc.
我本來以爲是S2還沒有完成,它可能仍然失敗,並返回消息它正在處理隊列。因此,S1不應該被允許從隊列中拉出另一條消息。我排隊我們交易,我已經嘗試設置TransactionScopeRequired = true
服務,但無濟於事。
這甚至可能嗎?我是否以錯誤的方式去做?如果沒有某種中央同步機制,是否有其他方法可以構建故障轉移服務?
class WcfMsmqProgram
{
private const string QueueName = "testq1";
static void Main()
{
// Create a transactional queue
string qPath = ".\\private$\\" + QueueName;
if (!MessageQueue.Exists(qPath))
MessageQueue.Create(qPath, true);
else
new MessageQueue(qPath).Purge();
// S1 processes as fast as it can
IService s1 = new ServiceImpl("S1");
// S2 is slow
IService s2 = new ServiceImpl("S2", 2000);
// MSMQ binding
NetMsmqBinding binding = new NetMsmqBinding(NetMsmqSecurityMode.None);
// Host S1
ServiceHost host1 = new ServiceHost(s1, new Uri("net.msmq://localhost/private"));
ConfigureService(host1, binding);
host1.Open();
Console.WriteLine("host1 open");
// Host S2
ServiceHost host2 = new ServiceHost(s2, new Uri("net.msmq://localhost/private"));
ConfigureService(host2, binding);
host2.Open();
Console.WriteLine("host2 open");
// Create a client
ChannelFactory<IService> factory = new ChannelFactory<IService>(binding, new EndpointAddress("net.msmq://localhost/private/" + QueueName));
IService client = factory.CreateChannel();
// Periodically call the service with a new number
int counter = 1;
using (Timer t = new Timer(o => client.EchoNumber(counter++), null, 0, 500))
{
// Enter to stop
Console.ReadLine();
}
host1.Close();
Console.WriteLine("host1 closed");
host2.Close();
Console.WriteLine("host2 closed");
// Wait for exit
Console.ReadLine();
}
static void ConfigureService(ServiceHost host, NetMsmqBinding binding)
{
var endpoint = host.AddServiceEndpoint(typeof(IService), binding, QueueName);
}
[ServiceContract]
interface IService
{
[OperationContract(IsOneWay = true)]
void EchoNumber(int number);
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
class ServiceImpl : IService
{
public ServiceImpl(string name, int sleep = 0)
{
this.name = name;
this.sleep = sleep;
}
private string name;
private int sleep;
public void EchoNumber(int number)
{
Thread.Sleep(this.sleep);
Console.WriteLine("{0}: {1:00}", this.name, number);
}
}
}
你得很難與WCF的MSMQ做到這一點結合(見http://stackoverflow.com/questions/729612/ordered-delivery-with-netmsmqbinding)。但是,您可以使用事務性MSMQ和一些對帳代碼(不含WCF)。 –