2014-04-28 144 views
1

Rx有很好的功能Observable.Buffer。但在現實生活中存在問題。如何重試熱觀察?

場景:應用程序向數據庫發送一個事件流。逐個插入事件很昂貴,所以我們需要對它進行批處理。我想爲此使用Observable.Buffer。但是插入到DB中的失敗機率很小(死鎖,超時,停機等)。

我可以添加一些重試邏輯到批處理函數本身,但它會反對Rx的可實現性的想法。 Observable.Retry不會削減它,因爲它會重新訂閱「熱門」來源,這意味着失敗的批次將會丟失。

是否有函數,我可以編寫來實現所需的效果,還是我需要實現自己的擴展?我想是這樣的:

_inputBuffer = new BufferBlock<int>(); 
_inputBuffer.AsObservable(). 
    Buffer(TimeSpan.FromSeconds(10), 1000). 
    Do(batch => SqlSaveBatch(batch)). 
    {Retry???}. 
    Subscribe() 

做到盡善盡美,我希望能夠克服時的onComplete被稱爲控制局面,而重試緩衝器有不完整的批次,並能夠執行某些操作(發送錯誤電子郵件,將數據保存到本地文件系統等)

回答

3

當保存到數據庫失敗並需要重試時,它不是真正的流或錯誤的事件,它是針對事件採取的操作。

我會組織你的代碼更是這樣的:

IDisposable subscription = 
    _inputBuffer.AsObservable(). 
    Buffer(TimeSpan.FromSeconds(10), 1000). 
    Subscribe(
     batch => SqlSaveBatchWithRetryLogic(batch), 
     () => YourOnCompleteAction); 
  • 可以提供內部SqlSaveBatchWithRetryLogic()
  • 事件
  • 手柄的onComplete內YourOnCompleteAction()
  • 您可以選擇配置的重試邏輯如果您未能保存批次,請在SqlSaveBatchWithRetryLogic()之內訂閱。
  • 這也消除Do副作用。

雖然我會小心這種方法 - 您需要觀察重試邏輯。你沒有背壓(減慢輸入的方式)。因此,如果您有任何退避/重試,您將冒着備份和填充內存的風險。如果您開始在計數限制內始終看到批次,那麼您可能會遇到麻煩!您可能想要實施一個計數器來監視未完成的項目。

+0

在完美的解決方案中,我將能夠重用此重試邏輯。否則,我將不得不拿出SqlSaveBatchWithRetryLogic,RabbitMQWithRetryLogic,MemcachedWithRetryLogic等。 –

+0

您可以將該抽象放入處理程序中,不是嗎? –

+0

我無法看到「YourOnCompleteAction」如何等待SqlSaveBatchWithRetryLogic內部緩衝區泄漏。我的意思是我可以看到如何實現它,但它會涉及這兩個函數之間共享的狀態變量,這會涉及更多的代碼,而不是我想要的。 –