2011-06-03 99 views
0

我有麻煩服用無擴展主題爲<IEnumerable的<Obj>>以主題爲<IEnumerable的<AggregatedObj >>

public Subject<IEnumerable<Person>> PersonDataSubject; 

對象並將其轉換爲:

public Subject<IEnumerable<BornInYear>> BornInYearSubject; 

...使用一些linq聚合。

下面的例子將它放在更多的上下文中,我掙扎的地方在於如何從PersonDataSubject的訂閱中獲取一個IEnumerable到BornInYearSubject中。

無論我嘗試什麼,我最終得到IObservable<BornInYear>,而不是IObservable<IEnumerable<BornInYear>>

目標是讓該類的客戶能夠訂閱兩個主題,並在每個「下一個」通知中獲得相應類型的IEnumerable。

public class ReactiveTest 
{ 
    public class Person 
    { 
     public string name; 
     public DateTime dob; 
    }; 

    public class BornInYear 
    { 
     public int Year; 
     public int Count; 
    } 

    public Subject<IEnumerable<Person>> PersonDataSubject = new Subject<IEnumerable<Person>>(); 
    public Subject<IEnumerable<BornInYear>> BornInYearSubject= new Subject<IEnumerable<BornInYear>>(); 

    public void LoadData() 
    { 
     // Go to hypotheritical web service and get batch of people. 
     IEnumerable<Person> people = WebService.Fetch(); 

     // Notify subscribers we have a fresh batch of data. 
     PersonDataSubject.OnNext(people); 
    } 

    public ReactiveTest() 
    { 
     // Hookup BornInYearSubject to listen to PersonDataSubject and publish the summarised data. 
     PersonDataSubject.Subscribe(pd => pd.GroupBy(p => p.dob.Year) 
              .Select(ps => new BornInYear { Year = ps.Key, Count = ps.Count()}) 
              .AsParallel() 
      ); 

     // How do I get the results of this out and published onto BornInYearSubject? 
    } 
} 

現在我知道我可以使用Task.Factory.StartNew(...)...爲實現這個我訂閱OnNext爲PersonDataSubject但我相信它必須能夠保持更多的反應?

+0

這有什麼錯用的IObservable >? – 2011-06-03 17:20:02

回答

0

好這部作品。感謝這些創意傢伙 - 事後回想起來似乎非常明顯!

using System; 
using System.Collections.Generic; 
using System.Linq; 

namespace TestReactive 
{ 
    public class ReactiveTest 
    { 
     public class Person 
     { 
      public string name; 
      public DateTime dob; 
     }; 

     public class BornInYear 
     { 
      public int Year; 
      public int Count; 
     } 

     public Subject<IEnumerable<Person>> PersonDataSubject = new Subject<IEnumerable<Person>>(); 
     public Subject<IEnumerable<BornInYear>> BornInYearSubject = new Subject<IEnumerable<BornInYear>>(); 

     public void LoadData() 
     { 
      IEnumerable<Person> people = new List<Person> { 
       new Person() {name = "Bill", dob = DateTime.Now.AddYears(-10)}, 
       new Person() {name = "Pete", dob = DateTime.Now.AddYears(-5)}, 
       new Person() {name = "Judy", dob = DateTime.Now.AddYears(-1)}, 
       new Person() {name = "Mike", dob = DateTime.Now.AddYears(-5)}, 
       new Person() {name = "Jake", dob = DateTime.Now.AddYears(-5)}, 
       new Person() {name = "Fred", dob = DateTime.Now.AddYears(-13)}, 
      }; 

      // Notify subscribers we have a fresh batch of data. 
      PersonDataSubject.OnNext(people); 
     } 

     public ReactiveTest() 
     { 
      var subj = PersonDataSubject.Select(pds => pds.GroupBy(pd => pd.dob.Year) 
                  .Select(p => new BornInYear { 
                   Year = p.Key, Count = p.Count() 
                  }).AsParallel()); 
      subj.Subscribe(BornInYearSubject); 

      BornInYearSubject.Subscribe(x=> Console.WriteLine("{0}", x.Count())); 
      LoadData(); 
     } 
    } 

    class Program 
    { 
     static void Main(string[] args) 
     { 
      ReactiveTest rt = new ReactiveTest(); 
     } 
    } 
} 
1

如何:

PersonDataSubject 
    .GroupBy(x => x.Dob.Year) 
    .Select(x => x.Aggregate(new List<BornInYear>(), (acc, x) => { acc.Add(new BornInYear { Year = ps.Key }); return acc; })) 
+0

會聚合返回什麼w/o OnCompleted? – 2011-06-03 19:02:48

+0

@Scott你說得對,我假設PersonDataSubject最終會完成。如果不是,則用'掃描'替換'Aggregate' – 2011-06-03 20:13:13

+0

這不會起作用,因爲PersonDataSubject是IEnumerable 因此您不能直接調用GroupBy,它是需要分組的IEnumerable的內容。給了我一些想法,儘管如此還會繼續發揮。謝謝。 – DanH 2011-06-06 11:04:09

相關問題