這是一個很大的挑戰來解決! 微妙的情況發生了。 爲了長久的解釋提前道歉,但忍耐着我!
TL; DR
訂閱到發佈源在順序被處理的,但任何其他訂閱之前直接向未發表的源。即你可以跳過隊列! 使用GroupJoin
訂閱訂單很重要,以確定何時打開和關閉窗口。
我的第一個擔心是,你是發佈refcounting一個主題。 這應該是一個無操作。 Subject<T>
沒有訂閱費用。
所以,當你刪除Publish().RefCount()
:
var word = new Subject<string>();
var wordPub = word;//.Publish().RefCount();
var length = word.Select(i => i.Length);
那麼你得到了同樣的問題。
那麼我期待GroupJoin
(因爲我的直覺暗示Publish().Refcount()
是一個紅鯡魚)。 對我來說,單獨看到這一點太難理順,所以我依靠簡單的調試,我也使用了幾年的幾十年 - Trace
或Log
擴展方法。
public interface ILogger
{
void Log(string input);
}
public class DumpLogger : ILogger
{
public void Log(string input)
{
//LinqPad `Dump()` extension method.
// Could use Console.Write instead.
input.Dump();
}
}
public static class ObservableLoggingExtensions
{
private static int _index = 0;
public static IObservable<T> Log<T>(this IObservable<T> source, ILogger logger, string name)
{
return Observable.Create<T>(o =>
{
var index = Interlocked.Increment(ref _index);
var label = $"{index:0000}{name}";
logger.Log($"{label}.Subscribe()");
var disposed = Disposable.Create(() => logger.Log($"{label}.Dispose()"));
var subscription = source
.Do(
x => logger.Log($"{label}.OnNext({x.ToString()})"),
ex => logger.Log($"{label}.OnError({ex})"),
() => logger.Log($"{label}.OnCompleted()")
)
.Subscribe(o);
return new CompositeDisposable(subscription, disposed);
});
}
}
當我記錄添加到您提供的代碼,它看起來像這樣:
var logger = new DumpLogger();
var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
wordPub.Log(logger, "lhs")
.GroupJoin(word.Select(i => i.Length).Log(logger, "rhs"),
s => wordPub.Log(logger, "lhsDuration"),
s => Observable.Empty<int>().Log(logger, "rhsDuration"),
(w, a) => new { Word = w, Lengths = a })
.SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => ($"{i.Word} {i.Length}").Dump("OnNext"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");
這一操作將在我的日誌東西輸出像下面
登錄與發佈()。引用計數()使用
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Banana 6
...
然而,當我刪除了使用Publish().RefCount()
新的日誌輸出如下:
日誌,而不只是主題
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Apple 6
OnNext
Banana 6
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
...
這給了我們一些啓示,但是當問題真正變得清楚的是,當我們開始註釋我們的日誌具有訂閱的邏輯列表。
與引用計數原始(工作)的代碼我們的註解可能是這樣的
//word.Subsribers.Add(wordPub)
0001lhs.Subscribe() //wordPub.Subsribers.Add(0001lhs)
0002rhs.Subscribe() //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe() //wordPub.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe() //wordPub.Subsribers.Add(0005lhsDuration)
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose() //wordPub.Subsribers.Remove(0003lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Banana 6
所以在這個例子中,當word.OnNext("Banana");
執行觀察員鏈順序鏈接
- wordPub
- 0002rhs
但是,wordPub有子訂閱! 所以真正的訂閱列表看起來像
- wordPub
- 0001lhs
-
0003lhsDuration
- 0005lhsDuration
- 0002rhs
如果我們註釋的主題只記錄我們看到的精妙之處在於
0001lhs.Subscribe() //word.Subsribers.Add(0001lhs)
0002rhs.Subscribe() //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe() //word.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe() //word.Subsribers.Add(0005lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Apple 6
OnNext
Banana 6
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
所以在這個例子中,當word.OnNext("Banana");
執行觀察員鏈順序鏈接
1. 0001lhs
2. 0002rhs
3. 0003lhsDuration
4. 0005lhsDuration
由於0003lhsDuration
訂閱是在0002rhs
之後激活的,它不會看到「Banana」值終止窗口,直到rhs已經發送該值,從而在仍然打開的窗口中產生它。
呼
正如@ francezu13k50指出你的問題很明顯,簡單的解決方法就是使用word.Select(x => new { Word = x, Length = x.Length });
,但我認爲你已經向我們提供了真正的問題(感謝)的簡化版本,我明白了爲什麼這不適合。 但是,因爲我不知道你真正的問題空間是什麼,我不知道該怎麼建議你提供解決方案,只是你有一個與你當前的代碼,現在你應該知道爲什麼它的工作方式。
偉大的分析!我將記錄器添加到我的工具箱中。真正的問題空間是使用視圖模型的rx。有些用戶輸入表單的屬性,如電子郵件地址。有關這些屬性的計算,例如電子郵件地址是否有效。我試圖關聯輸入和輸出。例如,我可能會加入電子郵件和isEmailValid。它看起來像關聯是棘手的。也許我會包含輸出與輸出 - isEmailValid將是一個{電子郵件=字符串,IsValid =布爾}所以沒有相關性是必要的。 – JustinM
我爲什麼要加入團隊:在我真正的問題空間中,我有一些慢速驗證。爲了驗證URL,答案開始爲空(未知),所以我可以用「計算...」消息更新UI,然後輸入true/false,這樣我就可以用「好URL」更新UI。或「錯誤的URL!」。 SelectMany允許我扁平化驗證消息以在UI中顯示。所以這實際上是一個非常簡單的問題,並且令人不安的是沒有簡單的答案。我的答案依賴於Publish()。RefCount(),你認爲這是不必要的。 – JustinM
看到我的解決方案涉及掃描。我認爲這完成了我想要的而沒有GroupJoin的不可預測性/詭異性。再次,我試圖在這裏做一些非常簡單的事情 - 關聯一個輸入和相應的輸出。我可能會將它變成擴展方法,以便我可以輕鬆地重用它。 – JustinM