2012-01-19 32 views
1

我正在測試Reactive Extensions(NuGet的主分支),我在合併時遇到了一些問題。我並行運行多個操作,並希望在所有操作完成時收到通知,但我只是沒有得到它。合併和onCompleted - 如何在所有並行操作完成時獲取通知?

這裏是我的操作,我使用Web客戶端下載一個網頁,然後計算字數:

private IObservable<int> GetWebsiteWordCount(Uri uri) 
    { 
     var client = new WebClient(); 

     var o = Observable.FromEventPattern<DownloadStringCompletedEventArgs>(client, "DownloadStringCompleted") 
        .ObserveOn(Scheduler.ThreadPool) 
        .Select(newString => newString.EventArgs.Result.Split(' ').Length); 

     client.DownloadStringAsync(uri); 

     return o; 
    } 

我然後創建很多這樣的:

 var tasks = new List<IObservable<int>>() 
         { 
          GetWebsiteWordCount(new Uri("http://www.google.com", UriKind.Absolute)), 
          GetWebsiteWordCount(new Uri("http://www.bing.com", UriKind.Absolute)), 
          GetWebsiteWordCount(new Uri("http://www.yle.fi", UriKind.Absolute)) 
         }; 

之後,我用的是合併組合這些並嘗試在全部完成時收到通知:

 tasks.Merge() 
      .ObserveOn(SynchronizationContext.Current) 
      .Subscribe(x => Debug.WriteLine(x), ex => Debug.WriteLine("exception thrown"), 
         () => Debug.WriteLine("all ready")); 

所有這些「任務」是正確執行和我在調試窗口的字數預期:

14279 
672 
292 

但我不明白「一切準備就緒」的消息。任何想法我失蹤?

更新:使用的合併而不是

總和我也試圖合併改成這樣:

 var result = from i in tasks.ToObservable() 
        from r in i 
        select r; 

     result.Sum().Subscribe(x => Debug.WriteLine("all ready. sum: " + x)); 

但我從來沒有得到結果返回。

更新:有固定的問題採取

感謝基甸Engelberth現在合併與和選項都工作。該解決方案是通過增加取(1)固定GetWebsiteWordCount法:

private IObservable<int> GetWebsiteWordCount(Uri uri) 
    { 
     var client = new WebClient(); 

     var o = Observable.FromEventPattern<DownloadStringCompletedEventArgs>(client, "DownloadStringCompleted") 
        .ObserveOn(Scheduler.ThreadPool) 
        .Select(newString => newString.EventArgs.Result.Split(' ').Length) 
        .Take(1); 

     client.DownloadStringAsync(uri); 

     return o; 
    } 

回答

2

Observable.FromEventPattern無法完成,因爲它沒有辦法知道什麼時候會有沒有更多的事件。因爲你知道這個特定事件是事件異步模式,所以它應該只觸發一次。告訴這個可觀察的,在GetWebsiteWordCount

+0

的某處添加一個.Take(1)謝謝!現在它工作:) –

相關問題