2015-11-06 45 views
4

我有以下API:在Rx.Net中,我如何實現一個可觀察的反饋循環,直到反饋耗盡?

IObservable<IList<SqlDataRecord>> WriteToDBAndGetFailedSource(SqlConnection conn, IList<SqlDataRecord> batch) 

它試圖寫分批進入數據庫。如果失敗,則返回整個批次,否則返回的觀察值爲空。

我也有一個源生產批次:

IObservable<IList<SqlDataRecord>> GetDataSource(string filePath, int bufferThreshold) 

現在,我可以像這樣將它們組合起來:

var failedBatchesSource = GetDataSource(filePath, 1048576) 
    .Select(batch => WriteToDBAndGetFailedSource(conn, batch)) 
    .Merge(100); 

這會寫所有批次(最多100併發)並返回可觀察到的失敗批次。

我真正想要的是在一定暫停之後將失敗的批次送回到批次源,可能是原始源仍在生成批次。我可以,當然,寫的是這樣的:

var failedBatchesSource = GetDataSource(filePath, 1048576) 
    .Select(batch => WriteToDBAndGetFailedSource(conn, batch)) 
    .Merge(100) 
    .Select(batch => WriteToDBAndGetFailedSource(conn, batch)) 
    .Merge(100); 

但它是錯的,當然,這是因爲:

  1. 這打破了有一個停頓之前,不合格批次再次處理的要求。
  2. 它可能會向數據庫生成超過100個併發寫入請求。
  3. 這就像展開一個迭代次數未知的for循環 - 非生產性的。

我也可以打出來的可觀察的單子一旦我收集了所有的失敗和重新開始一個循環中:

  var src = GetDataSource(filePath, 1048576); 

      for (;;) 
      { 
       var failed = await src 
        .Select(batch => WriteToDBAndGetFailedSource(conn, batch)) 
        .Merge(100) 
        .ToList(); 
       if (failed.Count == 0) 
       { 
        break; 
       } 
       src = failed.ToObservable(); 
      } 

但不知而內的停留,我可以做的更好可觀察的monad。

回答

0

使用Observable.Buffer.這允許您緩衝,直到您有100條記錄要發送,或者直到X時間過去。

或者,Observable.Interval只會觸發每X時間跨度。處理髮布事件時,您可以添加併發限制。

只要有待發表的物品,這些物品都應該反覆發射。

+0

魔鬼在細節。可以給出一個代碼示例? – mark

1

認爲這可能做的伎倆

public static IObservable<T> ProcessAll<T>(this IObservable<T> source, Func<T, IObservable<T>> processor, int mergeCount, TimeSpan failureDelay) 
{ 
    return Observable.Create<T>(
     observer => 
      { 
       var failed = new Subject<T>(); 

       return source.Merge(failed) 
         .Select(processor) 
         .Merge(mergeCount) 
         .Delay(failureDelay) 
         .Subscribe(failed.OnNext, observer.OnError, observer.OnCompleted); 
      }); 
} 

而且使用這樣的:

GetDataSource(filePath, 1048576) 
    .ProcessAll(batch => WriteToDBAndGetFailedSource(conn, batch), 100, TimeSpan.FromMilliseconds(500)) 
    .Subscribe(); 

ProcessAll是一個可怕的名字,但它是星期五晚上,我不能想到的更好的一個。

+0

「失敗」如何完成? – mark