2017-01-20 31 views
1

我正在構建一個集成測試,我正在使用InMemNetwork運行測試。Rebus - 運行單元測試並等待Rebus完成所有線程

有一個調用Thread.sleep代碼只是一個斷言之前,但那是測試的一個狡猾的方式,它會減慢我們的測試了很多。

我也做使用SagaFixtures一些集成測試和運行同步,但與註冊處理,運行處理程序和推遲的消息及其所有有點乏味簡單的下iBus實現。

有沒有辦法在由滷麪使用的所有線程等待,直到他們完成不充實,使用像ManualResetEvent的(在滷麪自己的測試中使用的)產品代碼執行?

回答

0

我通常會像你一樣使用SagaFixture,然後我使用FakeBus注入到saga處理程序中以捕獲他們的行爲。

我的大多數測試是簡單的處理程序的單元測試,雖然,但我經常會注入「真正」的服務,例如像執行IThisIThat轉到一個真實的數據庫。

的一對夫婦的情況下,雖然我旋轉起來的多個端點與在-MEM運輸,然後我通常落實InMemNetwork的擴展,它可以幫助我等具體事件發表或類似的東西 - 它可能看起來像這在測試:

var updated = await Network.WaitForNext<WhateverUpdated>(subscriberAddress, timeoutSeconds: 20); 

其中WaitForNext僅僅是通過輪詢指定subscriberAddress用於下一個消息的隊列,並試圖反序列化作爲WhateverUpdated擴展方法。

我希望能給你一些啓發:)

+0

製作的擴展方法,這將是一個完美的解決方案。非常感謝! –

0

對於一些場景的我用下面的方法來等待滷麪完成所有消息的處理。 rebus端點被託管在單獨的exe中,而rebus文件系統傳輸被用於集成測試(通常是Azure SB)。集成測試將exe文件加速並在每個exe文件中Rebus被配置爲0個工作者,所以它什麼都不做。然後在測試中,我們有一個WaitForMessagesProcessed()方法來配置大量的工人和塊,直到沒有更多的消息需要處理。

這裏是怎麼了大約看起來代碼:

public class MessageProcessor() { 
    private string queueName; 
    private int messagesWaitingForProcessing; 
    private IBus bus; 

    public MessageProcessor(string queueName) { 
     this.queueName = queueName; 

     this.bus = Configure.With(adapter) 
     .Transport(t => t.UseFileSystem(@"c:\baseDirectory", this.queueName)) 
     .Options(o => 
     { 
      o.SetNumberOfWorkers(0); 
     }) 
     .Events(e => 
     { 
      e.BeforeMessageSent += (thebus, headers, message, context) => 
      { 
       // When sending to itself, the message is not queued on the network. 
       var m = context.Load<Rebus.Pipeline.Send.DestinationAddresses>(); 
       if (m.Any(t => t == this.queueName)) 
        this.messagesWaitingForProcessing++; 
      }; 

      e.AfterMessageHandled += (thebus, headers, message, context, args) => 
      { 
       this.messagesWaitingForProcessing--; 
      }; 
     }) 
     .Start(); 
    } 

    public async Task WaitForMessagesProcessed() 
    { 
     this.DetermineMessagesWaitingForProcessing(); 
     while (this.messagesWaitingForProcessing > 0) 
     { 
      this.bus.Advanced.Workers.SetNumberOfWorkers(2); 

      while (this.messagesWaitingForProcessing > 0) 
      { 
       await Task.Delay(100); 
      } 

      this.bus.Advanced.Workers.SetNumberOfWorkers(0); 

      this.DetermineMessagesWaitingForProcessing(); 
     } 
    } 

    public void DetermineMessagesWaitingForProcessing() { 
     this.messagesWaitingForProcessing = Directory.GetFiles(GetDirectoryForQueueNamed(this.queueName),  "*.rebusmessage.json").Count(); 
    } 

    private static string GetDirectoryForQueueNamed(string queueName) 
    { 
     return Path.Combine(this.baseDiretory, queueName); 
    } 
} 

的測試也能像

[TestMethod] 
public void Test() { 
    var endpoint1 = LaunchExe("1"); 
    var endpoint2 = LaunchExe("2"); 

    endPoint1.DoSomeAction(); 

    endPoint1.WaitForMessagesProcessed(); 

    Assert.AreEqual("expectation", endPoint1.Query()); 
}