Rx有很好的功能Observable.Buffer。但在現實生活中存在問題。如何重試熱觀察?
場景:應用程序向數據庫發送一個事件流。逐個插入事件很昂貴,所以我們需要對它進行批處理。我想爲此使用Observable.Buffer。但是插入到DB中的失敗機率很小(死鎖,超時,停機等)。
我可以添加一些重試邏輯到批處理函數本身,但它會反對Rx的可實現性的想法。 Observable.Retry不會削減它,因爲它會重新訂閱「熱門」來源,這意味着失敗的批次將會丟失。
是否有函數,我可以編寫來實現所需的效果,還是我需要實現自己的擴展?我想是這樣的:
_inputBuffer = new BufferBlock<int>();
_inputBuffer.AsObservable().
Buffer(TimeSpan.FromSeconds(10), 1000).
Do(batch => SqlSaveBatch(batch)).
{Retry???}.
Subscribe()
做到盡善盡美,我希望能夠克服時的onComplete被稱爲控制局面,而重試緩衝器有不完整的批次,並能夠執行某些操作(發送錯誤電子郵件,將數據保存到本地文件系統等)
在完美的解決方案中,我將能夠重用此重試邏輯。否則,我將不得不拿出SqlSaveBatchWithRetryLogic,RabbitMQWithRetryLogic,MemcachedWithRetryLogic等。 –
您可以將該抽象放入處理程序中,不是嗎? –
我無法看到「YourOnCompleteAction」如何等待SqlSaveBatchWithRetryLogic內部緩衝區泄漏。我的意思是我可以看到如何實現它,但它會涉及這兩個函數之間共享的狀態變量,這會涉及更多的代碼,而不是我想要的。 –