2016-09-13 132 views
1

下面的單元測試永遠不會打印「Async 3」,因爲測試首先完成。我怎樣才能確保它完成?我能想到的最好的方法是在最後使用任意的Task.Delay或WriteAsync()。結果既不理想。確保在進程終止之前完成異步OnNext代碼

public async Task TestMethod1() // eg. webjob 
    { 
     TestContext.WriteLine("Starting test..."); 
     var observable = Observable.Create<int>(async ob => 
     { 
      ob.OnNext(1); 
      await Task.Delay(1000); // Fake async REST api call 
      ob.OnNext(2); 
      await Task.Delay(1000); 
      ob.OnNext(3); 
      ob.OnCompleted(); 
     }); 

     observable.Subscribe(i => TestContext.WriteLine($"Sync {i}")); 
     observable.SelectMany(i => WriteAsync(i).ToObservable()).Subscribe(); 

     await observable; 
     TestContext.WriteLine("Complete."); 
    } 

    public async Task WriteAsync(int value) // Fake async DB call 
    { 
     await Task.Delay(1000); 
     TestContext.WriteLine($"Async {value}"); 
    } 

編輯 我意識到提的單元測試很可能misleading.This不是一個測試問題。該代碼模擬了在Azure WebJob中運行的進程的實際問題,生產者和消費者都需要調用一些Async IO。問題在於,在消費者真正完成之前,webjob會完成。這是因爲我無法弄清楚如何正確地等待消費者的任何事情。也許這只是是不可能的RX ...

+0

你知道你正在創建三個獨立的訂閱到底層observable嗎?這是你的意圖嗎?還是你想分享三個觀察者中單個觀察值的值? – Enigmativity

+0

3個訂閱? '等待可觀察'是否也會導致訂閱?在真正的代碼中,原始的可觀察性很熱,所以我可能應該在示例中使它變得很熱門...... –

+0

是的,「await observable」確實會導致第三次訂閱。您的第二次訂閱「observable.SelectMany(i => WriteAsync(i).ToObservable())。Subscribe()'有一個延遲,所以'await observable'在它之前完成。這就是爲什麼你錯過了「Async 3」。 – Enigmativity

回答

1

編輯: 你基本上是在尋找一個阻塞運算符。舊的阻止操作符(如ForEach)已被棄用,以支持異步版本。你要等待的最後一個項目,像這樣:

public async Task TestMethod1() 
{ 
    TestContext.WriteLine("Starting test..."); 
    var observable = Observable.Create<int>(async ob => 
    { 
     ob.OnNext(1); 
     await Task.Delay(1000); 
     ob.OnNext(2); 
     await Task.Delay(1000); 
     ob.OnNext(3); 
     ob.OnCompleted(); 
    }); 

    observable.Subscribe(i => TestContext.WriteLine($"Sync {i}")); 
    var selectManyObservable = observable.SelectMany(i => WriteAsync(i).ToObservable()).Publish().RefCount(); 
    selectManyObservable.Subscribe(); 
    await selectManyObservable.LastOrDefaultAsync(); 
    TestContext.WriteLine("Complete."); 
} 

雖然這將解決您的眼前的問題,它看起來像你要保持運行到由於下面的問題(我增加了兩個)。 Rx在正確使用時功能非常強大,如果沒有使用,就會讓人困惑。

老答案:


有兩件事情:

  1. 。混合異步/等待和Rx通常會導致越來越兩者的缺陷和沒有好處。
  2. Rx具有強大的測試功能。你沒有使用它。
  3. 副作用,如WriteLine最好在訂閱中完全執行,而不是像SelectMany這樣的操作員執行。
  4. 你可能想要冷卻vs熱的可觀察物。
  5. 它沒有跑完成的原因是因爲你的測試跑步者。您的測試運行人員將在TestMethod1結束時終止測試。 Rx認購將以其他方式生活。當我在Linqpad運行你的代碼,我得到以下的輸出:

    開始測試...
    同步1
    同步2
    異步1
    同步3
    異步2
    完成。
    異步3

...這是我假設你想看到的,除了你可能想在完成異步3


後使用僅Rx,您的代碼會是這個樣子:

public void TestMethod1() 
{ 
    TestContext.WriteLine("Starting test..."); 
    var observable = Observable.Concat<int>(
     Observable.Return(1), 
     Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)), 
     Observable.Return(2), 
     Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)), 
     Observable.Return(3) 
    ); 

    var syncOutput = observable 
     .Select(i => $"Sync {i}"); 
    syncOutput.Subscribe(s => TestContext.WriteLine(s)); 

    var asyncOutput = observable 
     .SelectMany(i => WriteAsync(i, scheduler)); 
    asyncOutput.Subscribe(s => TestContext.WriteLine(s),() => TestContext.WriteLine("Complete.")); 
} 

public IObservable<string> WriteAsync(int value, IScheduler scheduler) 
{ 
    return Observable.Return(value) 
     .Delay(TimeSpan.FromSeconds(1), scheduler) 
     .Select(i => $"Async {value}"); 
} 


public static class TestContext 
{ 
    public static void WriteLine(string s) 
    { 
     Console.WriteLine(s); 
    } 
} 

這仍然沒有利用Rx的測試功能。這看起來像這樣:

public void TestMethod1() 
{ 
    var scheduler = new TestScheduler(); 
    TestContext.WriteLine("Starting test..."); 
    var observable = Observable.Concat<int>(
     Observable.Return(1), 
     Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1), scheduler), 
     Observable.Return(2), 
     Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1), scheduler), 
     Observable.Return(3) 
    ); 

    var syncOutput = observable 
     .Select(i => $"Sync {i}"); 
    syncOutput.Subscribe(s => TestContext.WriteLine(s)); 

    var asyncOutput = observable 
     .SelectMany(i => WriteAsync(i, scheduler)); 
    asyncOutput.Subscribe(s => TestContext.WriteLine(s),() => TestContext.WriteLine("Complete.")); 

    var asyncExpected = scheduler.CreateColdObservable<string>(
     ReactiveTest.OnNext(1000.Ms(), "Async 1"), 
     ReactiveTest.OnNext(2000.Ms(), "Async 2"), 
     ReactiveTest.OnNext(3000.Ms(), "Async 3"), 
     ReactiveTest.OnCompleted<string>(3000.Ms() + 1) //+1 because you can't have two notifications on same tick 
    ); 

    var syncExpected = scheduler.CreateColdObservable<string>(
     ReactiveTest.OnNext(0000.Ms(), "Sync 1"), 
     ReactiveTest.OnNext(1000.Ms(), "Sync 2"), 
     ReactiveTest.OnNext(2000.Ms(), "Sync 3"), 
     ReactiveTest.OnCompleted<string>(2000.Ms()) //why no +1 here? 
    ); 

    var asyncObserver = scheduler.CreateObserver<string>(); 
    asyncOutput.Subscribe(asyncObserver); 
    var syncObserver = scheduler.CreateObserver<string>(); 
    syncOutput.Subscribe(syncObserver); 
    scheduler.Start(); 
    ReactiveAssert.AreElementsEqual(
     asyncExpected.Messages, 
     asyncObserver.Messages); 

    ReactiveAssert.AreElementsEqual(
     syncExpected.Messages, 
     syncObserver.Messages); 
} 

public static class MyExtensions 
{ 
    public static long Ms(this int ms) 
    { 
     return TimeSpan.FromMilliseconds(ms).Ticks; 
    } 

} 

...所以不像你的任務測試,你不必等待。測試立即執行。您可以將延遲時間提高到幾分鐘或幾小時,TestScheduler將爲您嘲笑時間。然後你的測試運動員可能會很高興。

+0

如果有人能讓我滿意+1標記,我會非常感激。 – Shlomo

+0

我已經添加了一些更詳細的問題,這可能有點模糊 - 這不是一個真正的RX測試問題,雖然這是我想了解更多關於 –

+0

謝謝。編輯答案。 – Shlomo

-1

好了,你可以使用Observable.ForEach阻塞,直到一個IObservable已終止:

observable.ForEach(unusedValue => { }); 

你能TestMethod1正常,非異步方法,和那麼用這個替換await observable;

+0

不,它不幸產生相同的輸出(ForEach也被棄用)。 –

+0

OP已經阻止了'await observable',但第二個訂閱獨立運行,並且因爲它被延遲了,它會在稍後結束。 – Enigmativity

相關問題