2014-03-29 31 views
2

我想弄清楚如何處理n緩衝項目從一個序列然後等待秒前處理下n項目?RX:如何處理序列中的n個緩衝項目,然後在處理下一個n項目之前等待t秒?

下面是我想要做的一個粗略的形式,使用Thread.Sleep()。我想避免Thread.Sleep()並正確執行。

static void Main(string[] args) 
{ 
    var t = Observable.Range(0, 100000); 

    var query = t.Buffer(20);      

    query.ObserveOn(NewThreadScheduler.Default) 
     .Subscribe(x => DoStuff(x)); 

    Console.WriteLine("Press ENTER to exit"); 
    Console.ReadLine(); 

} 

static void DoStuff(IList<int> list) 
{ 
    Console.WriteLine(DateTime.Now); 

    foreach (var value in list) 
    { 
     Console.WriteLine(value); 
    } 

    Thread.Sleep(TimeSpan.FromSeconds(10)); 
} 

任何人可以幫助我找到更多的RX方法嗎?

感謝

閃存

回答

3
// Instantiate this once, we'll use it in a closure multiple times. 
var delay = Observable.Empty<int>().Delay(TimeSpan.FromMilliseconds(10)); 

// start with a source of individual items to be worked. 
Observable.Range(0, 100000) 
    // Create batches of work. 
    .Buffer(20) 
    // Select an observable for the batch of work, and concat a delay. 
    .Select(batch => batch.ToObservable().Concat(delay)) 
    // Concat those together and form a "process, delay, repeat" observable. 
    .Concat() 
    // Subscribe! 
    .Subscribe(Console.WriteLine); 

// Make sure we wait for our work to be done. 
// There are other ways to sync up, like async/await. 
Console.ReadLine(); 

或者,你也可以保持同步使用異步/等待:

static IObservable<int> delay = Observable.Empty<int>().Delay(TimeSpan.FromMilliseconds(100)); 

static async Task Run() 
{ 
    await Observable.Range(0, 1000) 
     .Buffer(20) 
     .Select(batch => batch.ToObservable().Concat(delay)) 
     .Concat() 
     .Do(Console.WriteLine) 
     .LastOrDefaultAsync(); 
} 

是不是delay觀察到了一個漂亮的把戲?它的工作原理是因爲OnCompleted像OnNext一樣被延遲!

1

大廈克里斯托弗的回答如果你不想列表元素平了,你可以做的更多信息:

var delay = Observable.Empty<IList<int>>().Delay(TimeSpan.FromSeconds(10)); 

var query = Observable.Range(0, 100000) 
         .Buffer(20) 
         .Select(batch => Observable.Return(batch).Concat(delay)) 
         .Concat(); 

query.Subscribe(list => 
        { 
         Console.WriteLine(DateTime.Now); 

         foreach (var value in list) 
         { 
          Console.WriteLine(value); 
         } 
        }); 
相關問題