2013-03-07 61 views
6

我被rx和特定查詢卡住了。 問題:Rx groupby直到條件發生變化

許多單一的更新操作是由連續流產生的。操作可以是插入或刪除。我想緩衝這些數據流並在當時執行少量操作,但維護訂單非常重要。此外,操作應被緩衝並在序列來完成每X秒

實施例:

在:

insert-insert-insert-delete-delete-insert-delete-delete-delete-delete 

輸出:

insert(3)-delete(2)-insert(1)-delete(4) 

我寫了一個簡單的應用程序,以測試它,它的作品或多或少,因爲我會,但它不尊重傳入的插入/刪除順序

namespace RxTests 
{ 
using System; 
using System.Collections.Generic; 
using System.Globalization; 
using System.Linq; 
using System.Reactive.Concurrency; 
using System.Reactive.Linq; 
using System.Reactive.Subjects; 
using System.Text; 
using System.Threading; 

internal class Program 
{ 
    private static readonly Random Random = new Random(); 

    private static readonly CancellationTokenSource ProducerStopped = new CancellationTokenSource(); 

    private static readonly ISubject<UpdateOperation> operations = new Subject<UpdateOperation>(); 

    private static void Main(string[] args) 
    { 
     Console.WriteLine("Starting production"); 
     var producerScheduler = new EventLoopScheduler(); 
     var consumerScheduler = new EventLoopScheduler(); 
     var producer = 
      Observable.Interval(TimeSpan.FromSeconds(2)) 
         .SubscribeOn(producerScheduler) 
         .Subscribe(Produce, WriteProductionCompleted); 
     var consumer = 
      operations.ObserveOn(producerScheduler) 
         .GroupBy(operation => operation.Delete) 
         .SelectMany(observable => observable.Buffer(TimeSpan.FromSeconds(8), 50)) 
         .SubscribeOn(consumerScheduler) 
         .Subscribe(WriteUpdateOperations); 
     Console.WriteLine("Type any key to stop"); 
     Console.ReadKey(); 
     consumer.Dispose(); 
     producer.Dispose(); 
    } 

    private static void Produce(long time) 
    { 
     var delete = Random.NextDouble() < 0.5; 
     Console.WriteLine("Produce {0}, {1} at {2}", time + 1, delete, time); 
     var idString = (time + 1).ToString(CultureInfo.InvariantCulture); 
     var id = time + 1; 
     operations.OnNext(
      new UpdateOperation(id, delete, idString, time.ToString(CultureInfo.InvariantCulture))); 
    } 

    private static void WriteProductionCompleted() 
    { 
     Console.WriteLine("Production completed"); 
     ProducerStopped.Cancel(); 
    } 

    private static void WriteUpdateOperation(UpdateOperation updateOperation) 
    { 
     Console.WriteLine("Consuming {0}", updateOperation); 
    } 

    private static void WriteUpdateOperations(IList<UpdateOperation> updateOperation) 
    { 
     foreach (var operation in updateOperation) 
     { 
      WriteUpdateOperation(operation); 
     } 
    } 

    private class UpdateOperation 
    { 
     public UpdateOperation(long id, bool delete, params string[] changes) 
     { 
      this.Id = id; 
      this.Delete = delete; 
      this.Changes = new List<string>(changes ?? Enumerable.Empty<string>()); 
     } 

     public bool Delete { get; set; } 

     public long Id { get; private set; } 

     public IList<string> Changes { get; private set; } 

     public override string ToString() 
     { 
      var stringBuilder = new StringBuilder("{UpdateOperation "); 
      stringBuilder.AppendFormat("Id: {0}, Delete: {1}, Changes: [", this.Id, this.Delete); 
      if (this.Changes.Count > 0) 
      { 
       stringBuilder.Append(this.Changes.First()); 
       foreach (var change in this.Changes.Skip(1)) 
       { 
        stringBuilder.AppendFormat(", {0}", change); 
       } 
      } 

      stringBuilder.Append("]}"); 
      return stringBuilder.ToString(); 
     } 
    } 
} 

}

任何人可以幫助我正確的查詢?

感謝

UPDATE 13年3月8日(由JerKimball建議)

以下線的微小變化/補充JerKimball代碼打印結果:

using(query.Subscribe(Print)) 
{ 
    Console.ReadLine(); 
    producer.Dispose();   
} 

使用以下打印方法:

private static void Print(IObservable<IList<Operation>> operations) 
{ 
    operations.Subscribe(Print); 
} 

private static void Print(IList<Operation> operations) 
{ 
    var stringBuilder = new StringBuilder("["); 
    if (operations.Count > 0) 
    { 
     stringBuilder.Append(operations.First()); 
     foreach (var item in operations.Skip(1)) 
     { 
      stringBuilder.AppendFormat(", {0}", item); 
     } 
    } 

    stringBuilder.Append("]"); 
    Console.WriteLine(stringBuilder); 
} 

和下面的字符串的操作:

public override string ToString() 
{ 
    return string.Format("{0}:{1}", this.Type, this.Seq); 
} 

順序被保存,但是:

  • 我不知道其他內部認購訂閱:這是正確的(這是一個問題,因爲我有很久以前,對我而言並不清楚)?
  • 我總是有每個列表(即使流產生兩個以上的連續值與同類型)的不超過兩種元素
+0

'我不知道在另一個訂閱訂閱:它是正確的< - 你是什麼意思?這發生在哪裏? (編輯)哦,我看到 - 在你的'打印'...是的,你不想這樣做,如果只是因爲你現在正在泄漏一個'IDisposable'' – JerKimball 2013-03-08 20:14:20

+0

'Subscribe'有一個重載需要一個'Action ' - 使用它;這就是爲什麼我在我的例子中使用'Console.WriteLine'。如果你改變'Print'的簽名來取'IList ',你可以'使用(query.Subscribe(Print))' – JerKimball 2013-03-08 20:17:12

+0

從Select我得到一個IObservable >>,這就是爲什麼我用第一次打印(IObservable >操作)。所以我會再次包裝第二個操作。訂閱(打印)在使用 – fra 2013-03-08 20:22:01

回答

1

讓我們嘗試一種新的方法(因此新的答案):

首先,讓我們定義的擴展方法,將「崩潰」的基礎上的一個關鍵項目的列表,同時保留順序:

public static class Ext 
{ 
    public static IEnumerable<List<T>> ToRuns<T, TKey>(
      this IEnumerable<T> source, 
      Func<T, TKey> keySelector) 
    { 
     using (var enumerator = source.GetEnumerator()) 
     { 
      if (!enumerator.MoveNext()) 
       yield break; 

      var currentSet = new List<T>(); 

      // inspect the first item 
      var lastKey = keySelector(enumerator.Current); 
      currentSet.Add(enumerator.Current); 

      while (enumerator.MoveNext()) 
      { 
       var newKey = keySelector(enumerator.Current); 
       if (!Equals(newKey, lastKey)) 
       { 
        // A difference == new run; return what we've got thus far 
        yield return currentSet; 
        lastKey = newKey; 
        currentSet = new List<T>(); 
       } 
       currentSet.Add(enumerator.Current); 
      } 

      // Return the last run. 
      yield return currentSet; 

      // and clean up 
      currentSet = new List<T>(); 
      lastKey = default(TKey); 
     } 
    } 
} 

相當簡單 - 給定一個IEnumerable<T>,將返回一個List<List<T>>其中每個子列表將具有相同的密鑰。

現在,爲了養活它並使用它:

var rnd = new Random(); 
var fakeSource = new Subject<Operation>(); 
var producer = Observable 
    .Interval(TimeSpan.FromMilliseconds(1000)) 
    .Subscribe(i => 
     { 
      var op = new Operation(); 
      op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete"; 
      fakeSource.OnNext(op); 
     });  

var singleSource = fakeSource 
    .Publish().RefCount(); 

var query = singleSource 
    // change this value to alter your "look at" time window 
    .Buffer(TimeSpan.FromSeconds(5))  
    .Select(buff => buff.ToRuns(op => op.Type).Where(run => run.Count > 0)); 

using(query.Subscribe(batch => 
{ 
    foreach(var item in batch) 
    { 
     Console.WriteLine("{0}({1})", item.First().Type, item.Count); 
    } 
})) 
{ 
    Console.ReadLine(); 
    producer.Dispose();  
} 

給一個旋轉 - 這是我在一個典型的運行看:

insert(4) 
delete(2) 
insert(1) 
delete(1) 
insert(1) 
insert(1) 
delete(1) 
insert(1) 
delete(2) 
delete(2) 
insert(2) 
delete(1) 
insert(1) 
delete(2) 
insert(2) 
+0

非常感謝! 這正是我所需要的。我會給你非常值得的獎勵! – fra 2013-03-16 15:48:21

4

依我看,你可以得到你以後用的GroupByUntil混合,DistinctUntilChangedBuffer

這一點需要一些調整,以適應您的示例代碼,但查詢(和概念)保持:

(編輯:DOH - 錯過了位那裏...)

void Main() 
{ 
    var rnd = new Random(); 
    var fakeSource = new Subject<Operation>(); 
    var producer = Observable 
     .Interval(TimeSpan.FromMilliseconds(1000)) 
     .Subscribe(i => 
      { 
       var op = new Operation(); 
       op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete"; 
       fakeSource.OnNext(op); 
      });  
    var singleSource = fakeSource.Publish().RefCount(); 

    var query = singleSource 
     // We want to groupby until we see a change in the source 
     .GroupByUntil(
       i => i.Type, 
       grp => singleSource.DistinctUntilChanged(op => op.Type)) 
     // then buffer up those observed events in the groupby window 
     .Select(grp => grp.Buffer(TimeSpan.FromSeconds(8), 50)); 

    using(query.Subscribe(Console.WriteLine)) 
    { 
     Console.ReadLine(); 
     producer.Dispose();   
    } 
} 

public class Operation { 
    private static int _cnt = 0; 
    public Operation() { Seq = _cnt++; } 
    public int Seq {get; set;} 
    public string Type {get; set;}  
} 
+0

嗨!謝謝,但它沒有給我預期的輸出。 我其實認爲GroupBy不適合我的需求,因爲我需要在兩個流之間切換。 您可以驗證您的解決方案的輸出嗎?謝謝 – fra 2013-03-08 18:48:04

+0

@fra哈 - 錯過了一部分... *嘆*我會編輯...好的,試試更新的版本? – JerKimball 2013-03-08 19:20:44

+0

非常感謝,我們快到了。還有一些疑問和問題,請檢查我更新的問題 – fra 2013-03-08 20:07:29