2017-08-03 73 views
2

我有無限的對象流。而且我的要求是,來自具有相同密鑰的可觀察流中的每個項目應該被同步處理,並且具有不同密鑰的所有其他項目可能/應該並行處理。做到這一點(在大多數地方提到),最簡單的方法是使用GroupByUntil操作:Rx.NET GroupByUntil組終止,等待線程完成

var results = observableStream 
    .GroupByUntil(item => item.Id, group => 
     group.Throttle(TimeSpan.FromSeconds(30), scheduler)) 
    .SelectMany(group => 
     group 
      .ObserveOn(scheduler) 
      .Select(item => ProcessItem(item))); 

var disposable = results.Subscribe(result => SaveResults(result)); 

代碼工作很好,直到我可以保證的ProcessItem(item)的執行時間不超過30秒。否則group.Throttle(TimeSpan.FromSeconds(30), scheduler)將關閉組的流,並且新項目到達並開始在新線程上處理的可能性很高。

所以基本上我需要知道我的線程已經完成處理所有具有特定鍵的項目,並且我需要在durationSelector之內通知GroupByUntil關於它的運算符參數。

有關如何實現此目的的任何想法?提前致謝。

+2

你怎麼知道你已經收到了最後一個特定的密鑰? – NetMage

+0

@NetMage其實我不會知道。我試圖實現的是,只有當處理特定組的線程完成了它的工作並且隊列中再沒有任何東西時,我才應該開始調節(反彈)。 – Azat

+0

'ProcessItem'是否同步?它是「異步」嗎?它是否返回'IObservable '? – Shlomo

回答

2

這與此問題非常相似:A way to push buffered events in even intervals

的問答形式對這個問題,有一個運營商Drain

public static class ObservableDrainExtensions 
{ 
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
     Func<TSource, IObservable<TOut>> selector) 
    { 
     return Observable.Defer(() => 
     { 
      BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); 

      return source 
       .Zip(queue, (v, q) => v) 
       .SelectMany(v => selector(v) 
        .Do(_ => { },() => queue.OnNext(new Unit())) 
       ); 
     }); 
    } 
} 

鑑於運營商,你的問題就變得非常簡單:

var results = observableStream 
    .GroupBy(item => item.Id) 
    .SelectMany(group => 
     group 
      .ObserveOn(scheduler) 
      .Drain(item => ProcessItem(item))); 

var disposable = results.Subscribe(result => SaveResults(result)); 

由於看起來像A1,A2流, B1,A3,B2,C1,B3,C2,GroupBy通過ID分開流:

A: A1, A2, A3 
B: B1, B2, B3 
C: C1, C2 

...和Drain確保對於給定子流中的項目,它們串行運行,而不是並行運行。

+0

不錯的解決方案,但只使用'GroupBy',這些組不會被銷燬,並且如果有大量的唯一密鑰,我可能會耗盡內存。 – Azat