我有無限的對象流。而且我的要求是,來自具有相同密鑰的可觀察流中的每個項目應該被同步處理,並且具有不同密鑰的所有其他項目可能/應該並行處理。做到這一點(在大多數地方提到),最簡單的方法是使用GroupByUntil
操作:Rx.NET GroupByUntil組終止,等待線程完成
var results = observableStream
.GroupByUntil(item => item.Id, group =>
group.Throttle(TimeSpan.FromSeconds(30), scheduler))
.SelectMany(group =>
group
.ObserveOn(scheduler)
.Select(item => ProcessItem(item)));
var disposable = results.Subscribe(result => SaveResults(result));
代碼工作很好,直到我可以保證的ProcessItem(item)
的執行時間不超過30秒。否則group.Throttle(TimeSpan.FromSeconds(30), scheduler)
將關閉組的流,並且新項目到達並開始在新線程上處理的可能性很高。
所以基本上我需要知道我的線程已經完成處理所有具有特定鍵的項目,並且我需要在durationSelector
之內通知GroupByUntil
關於它的運算符參數。
有關如何實現此目的的任何想法?提前致謝。
你怎麼知道你已經收到了最後一個特定的密鑰? – NetMage
@NetMage其實我不會知道。我試圖實現的是,只有當處理特定組的線程完成了它的工作並且隊列中再沒有任何東西時,我才應該開始調節(反彈)。 – Azat
'ProcessItem'是否同步?它是「異步」嗎?它是否返回'IObservable'? –
Shlomo