2010-03-24 62 views
0

我在Reactive extensions框架中有一個IObservable [下面示例中的命名行],我想爲它觀察到的每個對象添加索引編號。從無效擴展框架中壓縮Rx IObservable

我一直在努力,實現這個使用Zip功能:

rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) => 
    new { Row = row, Index = index }) 
    .Subscribe(a => ProcessRow(a.Row, a.Index),() => Completed()); 

..但不幸的是,這將引發

ArgumentOutOfRangeException: 指定參數超出有效值的範圍。 參數名稱:一次性使用

我是否理解Zip函數有誤或者是否存在與我的代碼有關的問題?

代碼的範圍部分似乎不是問題,並且IObservable尚未收到任何事件。

+0

我沒有收到這個異常...什麼是行的類型,IEnumerable,List,IObservable? 我發佈了我的代碼,因爲該評論無法接受它...我的遺漏導致你的拋出? –

回答

0

顯然,Zip擴展方法將原始的自定義IObservable轉換爲匿名可觀察對象,併爲其創建一個System.Collections.Generic.AnonymousObserver,它不會實現IDisposable。 因此,你無法實現的訂閱方法以正常的方式(至少我見過的方式,它使用),這是

public IDisposable Subscribe(IObserver<T> observer) { 
    // ..add to observer list.. 
    return observer as IDisposable 
} 

更可能是正確的答案應該是:

return Disposable.Create(() => Observers.Remove(observer)); 

你應該但請注意,collction可能都靈完成法進行修改,所以處理它們之前創建列表的副本:

public void Completed() 
{ 
    foreach (var observer in Observers.ToList()) 
    { 
     observer.OnCompleted(); 
    } 
} 
0

我不知道你是什麼問題是,這是否對你的工作(和缺少了什麼在這裏,你在做什麼?):

static void Main(string[] args) 
    { 
     var rows = new List<int> { 4,5,1,2,5 }.ToObservable(); 
     rows.Zip(Enumerable.Range(1, int.MaxValue), (row, index) => 
      new { Row = row, Index = index }) 
      .Subscribe(a => ProcessRow(a.Row, a.Index),() => Completed()); 

     Console.ReadLine(); 
    } 
    static void ProcessRow(int row, int index) { 
     Console.WriteLine("Row {0}, Index {1}", row, index); 
    } 
    static void Completed() { 
    } 
+0

問題出在我的IDisposable unsubscriber創建。我從某處愚蠢地複製/粘貼了一個樣本,這對我的情況來說是錯誤的。如上所述,我應該使用Disposable.Create函數,但是我使用它的方式返回了空IDisposable,這就是爲什麼它拋出了一個奇怪的異常。 –

+0

好的感謝您的更新。 –

1

。選擇具有過載,包括指數:

rows.Select((row, index) => new { row, index });