2016-07-14 26 views
4

我在這裏有一個簡單的程序,它顯示了各種單詞中的字母數。它按預期工作。對Publish()的行爲造成混淆。Refcount()

static void Main(string[] args) { 
    var word = new Subject<string>(); 
    var wordPub = word.Publish().RefCount(); 
    var length = word.Select(i => i.Length); 
    var report = 
     wordPub 
     .GroupJoin(length, 
      s => wordPub, 
      s => Observable.Empty<int>(), 
      (w, a) => new { Word = w, Lengths = a }) 
     .SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j })); 
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}")); 
    word.OnNext("Apple"); 
    word.OnNext("Banana"); 
    word.OnNext("Cat"); 
    word.OnNext("Donkey"); 
    word.OnNext("Elephant"); 
    word.OnNext("Zebra"); 
    Console.ReadLine(); 
} 

,輸出是:

Apple 5 
Banana 6 
Cat 3 
Donkey 6 
Elephant 8 
Zebra 5 

我使用的發佈()引用計數(),因爲 「wordpub」 包含在 「報告」 兩次。如果沒有它,首先發佈一個詞的時候,報告的一部分會通過回調得到通知,然後通知報告的另一部分,通知的兩倍。那是怎麼回事?輸出結果有11項而不是6項。至少這是我認爲正在發生的事情。我想在這種情況下使用Publish()。RefCount()同時更新報表的兩個部分。

但是,如果我改變長度的功能也使用已發佈的來源是這樣的:

var length = wordPub.Select(i => i.Length); 

然後輸出是這樣的:

Apple 5 
Apple 6 
Banana 6 
Cat 3 
Banana 3 
Cat 6 
Donkey 6 
Elephant 8 
Donkey 8 
Elephant 5 
Zebra 5 

爲什麼不能長函數也使用相同的公開來源?

回答

3

這是一個很大的挑戰來解決! 微妙的情況發生了。 爲了長久的解釋提前道歉,但忍耐着我!

TL; DR

訂閱到發佈源在順序被處理的,但任何其他訂閱之前直接向未發表的源。即你可以跳過隊列! 使用GroupJoin訂閱訂單很重要,以確定何時打開和關閉窗口。


我的第一個擔心是,你是發佈refcounting一個主題。 這應該是一個無操作。 Subject<T>沒有訂閱費用。

所以,當你刪除Publish().RefCount()

var word = new Subject<string>(); 
var wordPub = word;//.Publish().RefCount(); 
var length = word.Select(i => i.Length); 

那麼你得到了同樣的問題。

那麼我期待GroupJoin(因爲我的直覺暗示Publish().Refcount()是一個紅鯡魚)。 對我來說,單獨看到這一點太難理順,所以我依靠簡單的調試,我也使用了幾年的幾十年 - TraceLog擴展方法。

public interface ILogger 
{ 
    void Log(string input); 
} 
public class DumpLogger : ILogger 
{ 
    public void Log(string input) 
    { 
     //LinqPad `Dump()` extension method. 
     // Could use Console.Write instead. 
     input.Dump(); 
    } 
} 


public static class ObservableLoggingExtensions 
{ 
    private static int _index = 0; 

    public static IObservable<T> Log<T>(this IObservable<T> source, ILogger logger, string name) 
    { 
     return Observable.Create<T>(o => 
     { 
      var index = Interlocked.Increment(ref _index); 
      var label = $"{index:0000}{name}"; 
      logger.Log($"{label}.Subscribe()"); 
      var disposed = Disposable.Create(() => logger.Log($"{label}.Dispose()")); 
      var subscription = source 
       .Do(
        x => logger.Log($"{label}.OnNext({x.ToString()})"), 
        ex => logger.Log($"{label}.OnError({ex})"), 
        () => logger.Log($"{label}.OnCompleted()") 
       ) 
       .Subscribe(o); 

      return new CompositeDisposable(subscription, disposed); 
     }); 
    } 
} 

當我記錄添加到您提供的代碼,它看起來像這樣:

var logger = new DumpLogger(); 

var word = new Subject<string>(); 
var wordPub = word.Publish().RefCount(); 
var length = word.Select(i => i.Length); 
var report = 
    wordPub.Log(logger, "lhs") 
    .GroupJoin(word.Select(i => i.Length).Log(logger, "rhs"), 
     s => wordPub.Log(logger, "lhsDuration"), 
     s => Observable.Empty<int>().Log(logger, "rhsDuration"), 
     (w, a) => new { Word = w, Lengths = a }) 
    .SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j })); 
report.Subscribe(i => ($"{i.Word} {i.Length}").Dump("OnNext")); 
word.OnNext("Apple"); 
word.OnNext("Banana"); 
word.OnNext("Cat"); 
word.OnNext("Donkey"); 
word.OnNext("Elephant"); 
word.OnNext("Zebra"); 

這一操作將在我的日誌東西輸出像下面

登錄與發佈()。引用計數()使用

0001lhs.Subscribe()    
0002rhs.Subscribe()    
0001lhs.OnNext(Apple) 
0003lhsDuration.Subscribe()  
0002rhs.OnNext(5) 
0004rhsDuration.Subscribe() 
0004rhsDuration.OnCompleted() 
0004rhsDuration.Dispose() 

    OnNext 
    Apple 5 

0001lhs.OnNext(Banana) 
0005lhsDuration.Subscribe()  
0003lhsDuration.OnNext(Banana) 
0003lhsDuration.Dispose()  
0002rhs.OnNext(6) 
0006rhsDuration.Subscribe() 
0006rhsDuration.OnCompleted() 
0006rhsDuration.Dispose() 

    OnNext 
    Banana 6 
... 

然而,當我刪除了使用Publish().RefCount()新的日誌輸出如下:

日誌,而不只是主題

0001lhs.Subscribe()     
0002rhs.Subscribe()     
0001lhs.OnNext(Apple) 
0003lhsDuration.Subscribe()   
0002rhs.OnNext(5) 
0004rhsDuration.Subscribe() 
0004rhsDuration.OnCompleted() 
0004rhsDuration.Dispose() 

    OnNext 
    Apple 5 

0001lhs.OnNext(Banana) 
0005lhsDuration.Subscribe()   
0002rhs.OnNext(6) 
0006rhsDuration.Subscribe() 
0006rhsDuration.OnCompleted() 
0006rhsDuration.Dispose() 

    OnNext 
    Apple 6 

    OnNext 
    Banana 6 

0003lhsDuration.OnNext(Banana) 
0003lhsDuration.Dispose() 
... 

這給了我們一些啓示,但是當問題真正變得清楚的是,當我們開始註釋我們的日誌具有訂閱的邏輯列表。

與引用計數原始(工作)的代碼我們的註解可能是這樣的

//word.Subsribers.Add(wordPub) 

0001lhs.Subscribe()    //wordPub.Subsribers.Add(0001lhs) 
0002rhs.Subscribe()    //word.Subsribers.Add(0002rhs) 
0001lhs.OnNext(Apple) 
0003lhsDuration.Subscribe()  //wordPub.Subsribers.Add(0003lhsDuration) 
0002rhs.OnNext(5) 
0004rhsDuration.Subscribe() 
0004rhsDuration.OnCompleted() 
0004rhsDuration.Dispose() 

    OnNext 
    Apple 5 

0001lhs.OnNext(Banana) 
0005lhsDuration.Subscribe()  //wordPub.Subsribers.Add(0005lhsDuration) 
0003lhsDuration.OnNext(Banana) 
0003lhsDuration.Dispose()  //wordPub.Subsribers.Remove(0003lhsDuration) 
0002rhs.OnNext(6) 
0006rhsDuration.Subscribe() 
0006rhsDuration.OnCompleted() 
0006rhsDuration.Dispose() 

    OnNext 
    Banana 6 

所以在這個例子中,當word.OnNext("Banana");執行觀察員鏈順序鏈接

  1. wordPub
  2. 0002rhs

但是,wordPub有子訂閱! 所以真正的訂閱列表看起來像

  1. wordPub
    1. 0001lhs
    2. 0003lhsDuration
    3. 0005lhsDuration
  2. 0002rhs

如果我們註釋的主題只記錄我們看到的精妙之處在於

0001lhs.Subscribe()     //word.Subsribers.Add(0001lhs) 
0002rhs.Subscribe()     //word.Subsribers.Add(0002rhs) 
0001lhs.OnNext(Apple) 
0003lhsDuration.Subscribe()   //word.Subsribers.Add(0003lhsDuration) 
0002rhs.OnNext(5) 
0004rhsDuration.Subscribe() 
0004rhsDuration.OnCompleted() 
0004rhsDuration.Dispose() 

    OnNext 
    Apple 5 

0001lhs.OnNext(Banana) 
0005lhsDuration.Subscribe()   //word.Subsribers.Add(0005lhsDuration) 
0002rhs.OnNext(6) 
0006rhsDuration.Subscribe() 
0006rhsDuration.OnCompleted() 
0006rhsDuration.Dispose() 

    OnNext 
    Apple 6 

    OnNext 
    Banana 6 

0003lhsDuration.OnNext(Banana) 
0003lhsDuration.Dispose() 

所以在這個例子中,當word.OnNext("Banana");執行觀察員鏈順序鏈接

1. 0001lhs 
2. 0002rhs 
3. 0003lhsDuration 
4. 0005lhsDuration 

由於0003lhsDuration訂閱是在0002rhs之後激活的,它不會看到「Banana」值終止窗口,直到rhs已經發送該值,從而在仍然打開的窗口中產生它。

正如@ francezu13k50指出你的問題很明顯,簡單的解決方法就是使用word.Select(x => new { Word = x, Length = x.Length });,但我認爲你已經向我們提供了真正的問題(感謝)的簡化版本,我明白了爲什麼這不適合。 但是,因爲我不知道你真正的問題空間是什麼,我不知道該怎麼建議你提供解決方案,只是你有一個與你當前的代碼,現在你應該知道爲什麼它的工作方式。

+0

偉大的分析!我將記錄器添加到我的工具箱中。真正的問題空間是使用視圖模型的rx。有些用戶輸入表單的屬性,如電子郵件地址。有關這些屬性的計算,例如電子郵件地址是否有效。我試圖關聯輸入和輸出。例如,我可能會加入電子郵件和isEmailValid。它看起來像關聯是棘手的。也許我會包含輸出與輸出 - isEmailValid將是一個{電子郵件=字符串,IsValid =布爾}所以沒有相關性是必要的。 – JustinM

+0

我爲什麼要加入團隊:在我真正的問題空間中,我有一些慢速驗證。爲了驗證URL,答案開始爲空(未知),所以我可以用「計算...」消息更新UI,然後輸入true/false,這樣我就可以用「好URL」更新UI。或「錯誤的URL!」。 SelectMany允許我扁平化驗證消息以在UI中顯示。所以這實際上是一個非常簡單的問題,並且令人不安的是沒有簡單的答案。我的答案依賴於Publish()。RefCount(),你認爲這是不必要的。 – JustinM

+0

看到我的解決方案涉及掃描。我認爲這完成了我想要的而沒有GroupJoin的不可預測性/詭異性。再次,我試圖在這裏做一些非常簡單的事情 - 關聯一個輸入和相應的輸出。我可能會將它變成擴展方法,以便我可以輕鬆地重用它。 – JustinM

0

RefCount返回一個Observable,只要存在至少一個返回的Observable訂閱,就會保持連接狀態。在處理最後一次訂閱時,RefCount會將其與源的連接處理,並在新訂閱時重新連接。您的報告查詢可能會出現這樣的情況,即在查詢完成之前處理「wordPub」的所有訂閱。

取而代之的是複雜的羣組加入查詢,你可以簡單地做:

var report = word.Select(x => new { Word = x, Length = x.Length }); 

編輯:如果你想使用GroupJoin運營報表查詢 改成這樣:

var report = 
     wordPub 
     .GroupJoin(length, 
      s => wordPub, 
      s => Observable.Empty<int>(), 
      (w, a) => new { Word = w, Lengths = a }) 
     .SelectMany(i => i.Lengths.FirstAsync().Select(j => new { Word = i.Word, Length = j })); 
+0

我意識到有一種更簡單的方式來編寫我的查詢。我只是想了解如何將單獨的流合在一起。 – JustinM

+0

問題是,您將由GroupJoin命名的每個組投影到一系列長度中。您只需將序列剝離到當前長度。 – francezu13k50

0

因爲GroupJoin似乎很難處理,所以這裏是另一種關聯函數輸入和輸出的方法。

static void Main(string[] args) { 
    var word = new Subject<string>(); 
    var length = new Subject<int>(); 
    var report = 
     word 
     .CombineLatest(length, (w, l) => new { Word = w, Length = l }) 
     .Scan((a, b) => new { Word = b.Word, Length = a.Word == b.Word ? b.Length : -1 }) 
     .Where(i => i.Length != -1); 
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}")); 
    word.OnNext("Apple"); length.OnNext(5); 
    word.OnNext("Banana"); 
    word.OnNext("Cat"); length.OnNext(3); 
    word.OnNext("Donkey"); 
    word.OnNext("Elephant"); length.OnNext(8); 
    word.OnNext("Zebra"); length.OnNext(5); 
    Console.ReadLine(); 
} 

這種方法工作,如果每一個輸入具有0個或多個的輸出受到限制:(1)在相同的順序的輸入,(2)的每個輸出對應於最近的輸入而輸出僅到達。這就像一個LeftJoin - 第一個列表(單詞)中的每個項目與隨後到達的右側列表(長度)中的項目配對,直到發送第一個列表中的另一個項目。

0

試圖使用常規聯接而不是GroupJoin。我認爲問題在於,當創建一個新詞時,在Join內部存在競爭條件,從而創建一個新窗口並結束當前窗口。所以在這裏我試圖通過將每個單詞與表示窗口末端的零對相匹配來減少這個問題。不起作用,就像第一個版本沒有。如果沒有先前關閉的每個單詞創建一個新窗口,怎麼可能?完全困惑。

static void Main(string[] args) { 
    var lgr = new DelegateLogger(Console.WriteLine); 
    var word = new Subject<string>(); 
    var wordDelimited = 
     word 
     .Select(i => Observable.Return<string>(null).StartWith(i)) 
     .SelectMany(i => i); 
    var wordStart = wordDelimited.Where(i => i != null); 
    var wordEnd = wordDelimited.Where(i => i == null); 
    var report = Observable 
     .Join(
      wordStart.Log(lgr, "word"), // starts window 
      wordStart.Select(i => i.Length), 
      s => wordEnd.Log(lgr, "expireWord"), // ends current window 
      s => Observable.Empty<int>(), 
      (l, r) => new { Word = l, Length = r }); 
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}")); 
    word.OnNext("Apple"); 
    word.OnNext("Banana"); 
    word.OnNext("Cat"); 
    word.OnNext("Zebra"); 
    word.OnNext("Elephant"); 
    word.OnNext("Bear"); 
    Console.ReadLine(); 
}