我學習無擴展,我一直在試圖找出如果它是這樣的任務的匹配一批請求。處理與無擴展
我有加工成批請求作爲一個工作單元,並調用回調當所有請求已經完成的進程()方法。
這裏最重要的是,每個請求將調用回調函數同步或異步取決於它的實現,並批處理器必須能夠同時處理。
但沒有線程正在從批處理器開始,任何新的螺紋(或其它異步執行)將被從請求處理程序內如果有必要啓動。我不知道這是否符合rx的用例。
我當前工作的代碼看起來(幾乎)是這樣的:
public void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted)
{
IUnitOfWork uow = null;
try
{
uow = unitOfWorkFactory.Create();
var responses = new List<IResponse>();
var outstandingRequests = requests.Count;
foreach (var request in requests)
{
var correlationId = request.CorrelationId;
Action<IResponse> requestCallback = response =>
{
response.CorrelationId = correlationId;
responses.Add(response);
outstandingRequests--;
if (outstandingRequests != 0)
return;
uow.Commit();
onCompleted(responses);
};
requestProcessor.Process(request, requestCallback);
}
}
catch(Exception)
{
if (uow != null)
uow.Rollback();
}
if (uow != null)
uow.Commit();
}
你將如何實現這一點使用RX?這是合理的嗎?
注意,這項工作的單位將被同步提交即使有還未返回異步請求。
這的確非常有幫助!感謝您提供非常詳盡的答案。唯一讓我無法理解的是,當所有請求都得到迴應時,我該如何「做些什麼」?在zip之後我該怎麼說:現在所有的請求都被響應,用這些請求/響應對調用這些方法......我嘗試訂閱「rqrps」observable,在列表中收集響應並調用方法完成後,但沒有成功(在所有這些之後,收集列表中的響應似乎有點奇怪)。我真的很感激:) – asgerhallas
doh! ......稱爲錯誤的超載:) – asgerhallas
@asgerhallas - 你不需要等待所有的對子回來 - 只要在他們回來時開始處理它們。否則看看'.ToArray()'可觀察的運算符 - 它將'IObservable'變成'IObservable ' - 將n個值的可觀察值轉換爲具有n個元素的一個數組的可觀察值,並且只有在成功時。聽起來像是您需要的完美運營商。 –
Enigmativity