我有以下API:在Rx.Net中,我如何實現一個可觀察的反饋循環,直到反饋耗盡?
IObservable<IList<SqlDataRecord>> WriteToDBAndGetFailedSource(SqlConnection conn, IList<SqlDataRecord> batch)
它試圖寫分批進入數據庫。如果失敗,則返回整個批次,否則返回的觀察值爲空。
我也有一個源生產批次:
IObservable<IList<SqlDataRecord>> GetDataSource(string filePath, int bufferThreshold)
現在,我可以像這樣將它們組合起來:
var failedBatchesSource = GetDataSource(filePath, 1048576)
.Select(batch => WriteToDBAndGetFailedSource(conn, batch))
.Merge(100);
這會寫所有批次(最多100併發)並返回可觀察到的失敗批次。
我真正想要的是在一定暫停之後將失敗的批次送回到批次源,可能是原始源仍在生成批次。我可以,當然,寫的是這樣的:
var failedBatchesSource = GetDataSource(filePath, 1048576)
.Select(batch => WriteToDBAndGetFailedSource(conn, batch))
.Merge(100)
.Select(batch => WriteToDBAndGetFailedSource(conn, batch))
.Merge(100);
但它是錯的,當然,這是因爲:
- 這打破了有一個停頓之前,不合格批次再次處理的要求。
- 它可能會向數據庫生成超過100個併發寫入請求。
- 這就像展開一個迭代次數未知的for循環 - 非生產性的。
我也可以打出來的可觀察的單子一旦我收集了所有的失敗和重新開始一個循環中:
var src = GetDataSource(filePath, 1048576);
for (;;)
{
var failed = await src
.Select(batch => WriteToDBAndGetFailedSource(conn, batch))
.Merge(100)
.ToList();
if (failed.Count == 0)
{
break;
}
src = failed.ToObservable();
}
但不知而內的停留,我可以做的更好可觀察的monad。
魔鬼在細節。可以給出一個代碼示例? – mark