編輯: 你基本上是在尋找一個阻塞運算符。舊的阻止操作符(如ForEach
)已被棄用,以支持異步版本。你要等待的最後一個項目,像這樣:
public async Task TestMethod1()
{
TestContext.WriteLine("Starting test...");
var observable = Observable.Create<int>(async ob =>
{
ob.OnNext(1);
await Task.Delay(1000);
ob.OnNext(2);
await Task.Delay(1000);
ob.OnNext(3);
ob.OnCompleted();
});
observable.Subscribe(i => TestContext.WriteLine($"Sync {i}"));
var selectManyObservable = observable.SelectMany(i => WriteAsync(i).ToObservable()).Publish().RefCount();
selectManyObservable.Subscribe();
await selectManyObservable.LastOrDefaultAsync();
TestContext.WriteLine("Complete.");
}
雖然這將解決您的眼前的問題,它看起來像你要保持運行到由於下面的問題(我增加了兩個)。 Rx在正確使用時功能非常強大,如果沒有使用,就會讓人困惑。
老答案:
有兩件事情:
- 。混合異步/等待和Rx通常會導致越來越兩者的缺陷和沒有好處。
- Rx具有強大的測試功能。你沒有使用它。
- 副作用,如
WriteLine
最好在訂閱中完全執行,而不是像SelectMany
這樣的操作員執行。
- 你可能想要冷卻vs熱的可觀察物。
它沒有跑完成的原因是因爲你的測試跑步者。您的測試運行人員將在TestMethod1
結束時終止測試。 Rx認購將以其他方式生活。當我在Linqpad運行你的代碼,我得到以下的輸出:
開始測試...
同步1
同步2
異步1
同步3
異步2
完成。
異步3
...這是我假設你想看到的,除了你可能想在完成異步3
後使用僅Rx,您的代碼會是這個樣子:
public void TestMethod1()
{
TestContext.WriteLine("Starting test...");
var observable = Observable.Concat<int>(
Observable.Return(1),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)),
Observable.Return(2),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)),
Observable.Return(3)
);
var syncOutput = observable
.Select(i => $"Sync {i}");
syncOutput.Subscribe(s => TestContext.WriteLine(s));
var asyncOutput = observable
.SelectMany(i => WriteAsync(i, scheduler));
asyncOutput.Subscribe(s => TestContext.WriteLine(s),() => TestContext.WriteLine("Complete."));
}
public IObservable<string> WriteAsync(int value, IScheduler scheduler)
{
return Observable.Return(value)
.Delay(TimeSpan.FromSeconds(1), scheduler)
.Select(i => $"Async {value}");
}
public static class TestContext
{
public static void WriteLine(string s)
{
Console.WriteLine(s);
}
}
這仍然沒有利用Rx的測試功能。這看起來像這樣:
public void TestMethod1()
{
var scheduler = new TestScheduler();
TestContext.WriteLine("Starting test...");
var observable = Observable.Concat<int>(
Observable.Return(1),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1), scheduler),
Observable.Return(2),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1), scheduler),
Observable.Return(3)
);
var syncOutput = observable
.Select(i => $"Sync {i}");
syncOutput.Subscribe(s => TestContext.WriteLine(s));
var asyncOutput = observable
.SelectMany(i => WriteAsync(i, scheduler));
asyncOutput.Subscribe(s => TestContext.WriteLine(s),() => TestContext.WriteLine("Complete."));
var asyncExpected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(1000.Ms(), "Async 1"),
ReactiveTest.OnNext(2000.Ms(), "Async 2"),
ReactiveTest.OnNext(3000.Ms(), "Async 3"),
ReactiveTest.OnCompleted<string>(3000.Ms() + 1) //+1 because you can't have two notifications on same tick
);
var syncExpected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0000.Ms(), "Sync 1"),
ReactiveTest.OnNext(1000.Ms(), "Sync 2"),
ReactiveTest.OnNext(2000.Ms(), "Sync 3"),
ReactiveTest.OnCompleted<string>(2000.Ms()) //why no +1 here?
);
var asyncObserver = scheduler.CreateObserver<string>();
asyncOutput.Subscribe(asyncObserver);
var syncObserver = scheduler.CreateObserver<string>();
syncOutput.Subscribe(syncObserver);
scheduler.Start();
ReactiveAssert.AreElementsEqual(
asyncExpected.Messages,
asyncObserver.Messages);
ReactiveAssert.AreElementsEqual(
syncExpected.Messages,
syncObserver.Messages);
}
public static class MyExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
...所以不像你的任務測試,你不必等待。測試立即執行。您可以將延遲時間提高到幾分鐘或幾小時,TestScheduler
將爲您嘲笑時間。然後你的測試運動員可能會很高興。
你知道你正在創建三個獨立的訂閱到底層observable嗎?這是你的意圖嗎?還是你想分享三個觀察者中單個觀察值的值? – Enigmativity
3個訂閱? '等待可觀察'是否也會導致訂閱?在真正的代碼中,原始的可觀察性很熱,所以我可能應該在示例中使它變得很熱門...... –
是的,「await observable」確實會導致第三次訂閱。您的第二次訂閱「observable.SelectMany(i => WriteAsync(i).ToObservable())。Subscribe()'有一個延遲,所以'await observable'在它之前完成。這就是爲什麼你錯過了「Async 3」。 – Enigmativity