2011-10-30 94 views
1

我學習無擴展,我一直在試圖找出如果它是這樣的任務的匹配一批請求。處理與無擴展

我有加工成批請求作爲一個工作單元,並調用回調當所有請求已經完成的進程()方法。

這裏最重要的是,每個請求將調用回調函數同步或異步取決於它的實現,並批處理器必須能夠同時處理。

但沒有線程正在從批處理器開始,任何新的螺紋(或其它異步執行)將被從請求處理程序內如果有必要啓動。我不知道這是否符合rx的用例。

我當前工作的代碼看起來(幾乎)是這樣的:

public void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted) 
{ 
    IUnitOfWork uow = null; 
    try 
    { 
     uow = unitOfWorkFactory.Create(); 

     var responses = new List<IResponse>(); 
     var outstandingRequests = requests.Count; 
     foreach (var request in requests) 
     { 
      var correlationId = request.CorrelationId; 
      Action<IResponse> requestCallback = response => 
      { 
       response.CorrelationId = correlationId; 
       responses.Add(response); 
       outstandingRequests--; 
       if (outstandingRequests != 0) 
        return; 

       uow.Commit(); 
       onCompleted(responses); 
      }; 

      requestProcessor.Process(request, requestCallback); 
     } 
    } 
    catch(Exception) 
    { 
     if (uow != null) 
      uow.Rollback(); 
    } 

    if (uow != null) 
     uow.Commit(); 
}   

你將如何實現這一點使用RX?這是合理的嗎?

注意,這項工作的單位將被同步提交即使有還未返回異步請求。

回答

3

我的辦法處理這一是兩步。

首先創建一個通用的運營商,輪流Action<T, Action<R>>Func<T, IObservable<R>>

public static class ObservableEx 
{ 
    public static Func<T, IObservable<R>> FromAsyncCallbackPattern<T, R>(
     this Action<T, Action<R>> call) 
    { 
     if (call == null) throw new ArgumentNullException("call"); 
     return t => 
     { 
      var subject = new AsyncSubject<R>(); 
      try 
      { 
       Action<R> callback = r => 
       { 
        subject.OnNext(r); 
        subject.OnCompleted(); 
       }; 
       call(t, callback); 
      } 
      catch (Exception ex) 
      { 
       return Observable.Throw<R>(ex, Scheduler.ThreadPool); 
      } 
      return subject.AsObservable<R>(); 
     }; 
    } 
} 

接下來,並將該呼叫void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted)IObservable<IResponse> Process(IObservable<IRequest> requests)

public IObservable<IResponse> Process(IObservable<IRequest> requests) 
{ 
    Func<IRequest, IObservable<IResponse>> rq2rp = 
     ObservableEx.FromAsyncCallbackPattern 
      <IRequest, IResponse>(requestProcessor.Process); 

    var query = (
     from rq in requests 
     select rq2rp(rq)).Concat(); 

    var uow = unitOfWorkFactory.Create(); 
    var subject = new ReplaySubject<IResponse>(); 

    query.Subscribe(
     r => subject.OnNext(r), 
     ex => 
     { 
      uow.Rollback(); 
      subject.OnError(ex); 
     }, 
     () => 
     { 
      uow.Commit(); 
      subject.OnCompleted(); 
     }); 

    return subject.AsObservable(); 
} 

現在,這不僅運行處理異步,但它也確保了結果的正確順序。

事實上,因爲你開始集合,你甚至可以做到這一點:

var rqs = requests.ToObservable(); 

var rqrps = rqs.Zip(Process(rqs), 
    (rq, rp) => new 
    { 
     Request = rq, 
     Response = rp, 
    }); 

那麼你將有一個可觀察的是對了,而不需要一個CorrelationId屬性每個請求/響應。

我希望這會有所幫助。

+0

這的確非常有幫助!感謝您提供非常詳盡的答案。唯一讓我無法理解的是,當所有請求都得到迴應時,我該如何「做些什麼」?在zip之後我該怎麼說:現在所有的請求都被響應,用這些請求/響應對調用這些方法......我嘗試訂閱「rqrps」observable,在列表中收集響應並調用方法完成後,但沒有成功(在所有這些之後,收集列表中的響應似乎有點奇怪)。我真的很感激:) – asgerhallas

+0

doh! ......稱爲錯誤的超載:) – asgerhallas

+0

@asgerhallas - 你不需要等待所有的對子回來 - 只要在他們回來時開始處理它們。否則看看'.ToArray()'可觀察的運算符 - 它將'IObservable '變成'IObservable ' - 將n個值的可觀察值轉換爲具有n個元素的一個數組的可觀察值,並且只有在成功時。聽起來像是您需要的完美運營商。 – Enigmativity

1

這是Rx的天才的一部分,你可以自由地同步或異步返回結果:

public IObservable<int> AddNumbers(int a, int b) { 
    return Observable.Return(a + b); 
} 

public IObservable<int> AddNumbersAsync(int a, int b) { 
    return Observable.Start(() => a + b, Scheduler.NewThread); 
} 

它們都有的IObservable類型,所以他們同樣工作。如果你想要找出何時完成所有IObservables,總結會做到這一點,因爲它會變成「否」可觀察到的項目成在年底返回項目:

IObservable<int> listOfObservables[]; 

listObservables.ToObservable() 
    .Merge() 
    .Aggregate(0, (acc, x) => acc+1) 
    .Subscribe(x => Console.WriteLine("{0} items were run", x)); 
+0

非常好,謝謝!現在我想知道,最簡單的方法是什麼?如果我應該採用當前的requestProcessor.Process()來接收回調並將該回調的調用轉換爲正在觀察的內容? – asgerhallas

+0

哦...發現這個http://stackoverflow.com/questions/5827178/using-rx-framework-for-async-calls-using-the-void-asyncmethodactiont-callback。這似乎是正確的方向。 – asgerhallas