2011-12-28 50 views
1

我在Silverlight 4中使用Rx來調用我的數據訪問代碼(在沒有TPL的情況下)。Rx運算符串行運行任務,在末尾收集異常

我想連續進行3個Web服務調用(現在不平行)並獲得結果。我曾經使用SelectMany:

var seq = from a1 in Observable.Return(5).Delay(TimeSpan.FromSeconds(2)) 
      from b1 in Observable.Return(6).Delay(TimeSpan.FromSeconds(2)) 
      from c1 in Observable.Return(7).Delay(TimeSpan.FromSeconds(2)) 
      select new { a1, b1, c1 }; 

但我想第二個和第三個調用仍然會執行,即使第一個有異常。

是否有一個Rx操作符會組合序列,但只有OnException一旦其所有序列都完成了?東西,將在功能上等同於下面的代碼:

using System; 
using System.Collections.Generic; 
using System.Reactive.Concurrency; 
using System.Reactive.Disposables; 
using System.Reactive.Linq; 

namespace ConsoleApplication4 
{ 

    public class Results 
    { 
     public int A { get; set; } 
     public int B { get; set; } 
     public string C { get; set; } 
    } 

    class Program 
    { 
     static void Main(string[] args) 
     { 
      new Program().Test(); 
     } 

     public void Test() 
     { 

      GetResults().SubscribeOn(Scheduler.NewThread).Subscribe(
       results => Console.WriteLine("{0} {1} {2}", results.A, results.B, results.C), 
       ex => Console.WriteLine(ex.ToString()), 
       () => Console.WriteLine("Completed") 
      ); 

      Console.WriteLine("Not blocking"); 

      Console.Read(); 
     } 

     public IObservable<Results> GetResults() 
     { 
      return Observable.Create<Results>(obs => 
       { 

        var a = Observable.Return(5).Delay(TimeSpan.FromSeconds(2)); 
        var b = Observable.Throw<int>(new Exception("uh oh")).Delay(TimeSpan.FromSeconds(2)); 
        var c = Observable.Return("7").Delay(TimeSpan.FromSeconds(2)); 

        var results = new Results(); 
        var exceptions = new List<Exception>(); 

        try 
        { 
         results.A = a.FirstOrDefault(); 
        } 
        catch (Exception ex) 
        { 
         exceptions.Add(ex); 
        } 

        try 
        { 
         results.B = b.FirstOrDefault(); 
        } 
        catch (Exception ex) 
        { 
         exceptions.Add(ex); 
        } 

        try 
        { 
         results.C = c.FirstOrDefault(); 
        } 
        catch (Exception ex) 
        { 
         exceptions.Add(ex); 
        } 

        obs.OnNext(results); 
        if (exceptions.Count > 0) 
         obs.OnError(new AggregateException(exceptions.ToArray())); 
        else 
         obs.OnCompleted(); 
        return Disposable.Empty; 
       }); 
     } 
    } 
} 

回答

1

如何:

var result = Observable.Concat(
    startObservable1().Catch(Observable.Return<TTheType>(null)), 
    startObservable2().Catch(Observable.Return<TTheType>(null)), 
    startObservable3().Catch(Observable.Return<TTheType>(null))); 

是否有一個的Rx操作員將結合序列,但只有onException的一次它的所有序列已經完成?

這部分是有點困難,我使用這個類雖然我不是超級喜歡它:

public class Maybe<T> 
{ 
    public Exception Exception { get; protected set; } 

    T _Value; 
    public T Value { 
     get { 
      if (Exception != null) { 
       throw Exception; 
      } 
      return _Value; 
     } 
     protected set { _Value = value; } 
    } 

    public static Maybe<T> AsError(Exception ex) 
    { 
     return new Maybe<T>() {Value = default(T), Exception = ex}; 
    } 

    public static Maybe<T> AsValue(T value) 
    { 
     return new Maybe<T>() {Value = value}; 
    } 
} 

然後,你可以這樣做:

var result = Observable.Concat(
    startObservable1().Select(x => Maybe.AsValue(x)).Catch<T1, Exception>(x => Maybe.AsError(x)), 
    startObservable2().Select(x => Maybe.AsValue(x)).Catch<T1, Exception>(x => Maybe.AsError(x)), 
    startObservable3().Select(x => Maybe.AsValue(x)).Catch<T1, Exception>(x => Maybe.AsError(x))); 

你很可能編寫你自己的隱藏Select + Catch的Maybeify()擴展方法。

+0

我無法使用Concat,因爲Web服務沒有返回相同的類型。也許聽起來像個好主意。它看起來像在SL4中實現Task類。 – foson 2012-01-01 20:31:29