對於一些場景的我用下面的方法來等待滷麪完成所有消息的處理。 rebus端點被託管在單獨的exe中,而rebus文件系統傳輸被用於集成測試(通常是Azure SB)。集成測試將exe文件加速並在每個exe文件中Rebus被配置爲0個工作者,所以它什麼都不做。然後在測試中,我們有一個WaitForMessagesProcessed()方法來配置大量的工人和塊,直到沒有更多的消息需要處理。
這裏是怎麼了大約看起來代碼:
public class MessageProcessor() {
private string queueName;
private int messagesWaitingForProcessing;
private IBus bus;
public MessageProcessor(string queueName) {
this.queueName = queueName;
this.bus = Configure.With(adapter)
.Transport(t => t.UseFileSystem(@"c:\baseDirectory", this.queueName))
.Options(o =>
{
o.SetNumberOfWorkers(0);
})
.Events(e =>
{
e.BeforeMessageSent += (thebus, headers, message, context) =>
{
// When sending to itself, the message is not queued on the network.
var m = context.Load<Rebus.Pipeline.Send.DestinationAddresses>();
if (m.Any(t => t == this.queueName))
this.messagesWaitingForProcessing++;
};
e.AfterMessageHandled += (thebus, headers, message, context, args) =>
{
this.messagesWaitingForProcessing--;
};
})
.Start();
}
public async Task WaitForMessagesProcessed()
{
this.DetermineMessagesWaitingForProcessing();
while (this.messagesWaitingForProcessing > 0)
{
this.bus.Advanced.Workers.SetNumberOfWorkers(2);
while (this.messagesWaitingForProcessing > 0)
{
await Task.Delay(100);
}
this.bus.Advanced.Workers.SetNumberOfWorkers(0);
this.DetermineMessagesWaitingForProcessing();
}
}
public void DetermineMessagesWaitingForProcessing() {
this.messagesWaitingForProcessing = Directory.GetFiles(GetDirectoryForQueueNamed(this.queueName), "*.rebusmessage.json").Count();
}
private static string GetDirectoryForQueueNamed(string queueName)
{
return Path.Combine(this.baseDiretory, queueName);
}
}
的測試也能像
[TestMethod]
public void Test() {
var endpoint1 = LaunchExe("1");
var endpoint2 = LaunchExe("2");
endPoint1.DoSomeAction();
endPoint1.WaitForMessagesProcessed();
Assert.AreEqual("expectation", endPoint1.Query());
}
製作的擴展方法,這將是一個完美的解決方案。非常感謝! –