在一個Node應用程序中,我試圖用RxJS來處理事件流。事件流是許多文檔更改的列表。我使用groupBy通過documentId將流分割成新的流。但是我想知道,一旦文檔在客戶端關閉,並且沒有新事件添加到該文檔標識的流中,groupBy會在該文檔流爲空時處理該文檔的流?如果不是,我會如何手動做到這一點?我想避免由正在創建但未被銷燬的新文檔流引起的內存泄漏。將Rx.Observable.groupBy清理空流嗎?
回答
既然你包括.NET標籤,我將介紹Rx.NET爲好。
你的問題是措辭有點不正確。當且僅當它們從未有事件時,流纔是空的。所以,他們不能成爲空。不發射數據的流通常不會消耗太多的資源。
在.NET中,集團將不會終止,直到源終止。我們使用'GroupByUntil`,它允許你爲每個組指定一個durationSelector流。 Observable.Timer經常適用於此。
這意味着,你可能會得到多個非併發流使用相同的密鑰出現一段時間,但如果(這是常有的情況),您的組流在某一點扁平,都不會有問題。
在rxjs,我們也有groupByUntil。
在Rx-Java中,其行爲相似的groupByUntil方法分爲groupBy-有關更多詳細信息,請參閱https://github.com/ReactiveX/RxJava/pull/1727和https://github.com/benjchristensen/RxJava/commit/b9302956832e3e77579f63fd9db25aa60eb4192a。
http://reactivex.io/documentation/operators/groupby.html說:
如果從GroupedObservables之一退訂,那GroupedObservable將被終止。如果源Observable稍後發出一個項,該項的鍵匹配以這種方式終止的GroupedObservable,則groupBy將創建併發出一個新的GroupedObservable以匹配該鍵。
因此,在Rx-Java中,您必須從分組的可觀察流中取消訂閱才能終止它。 takeUntil
與timer
流可以爲此工作。
附錄:
在回答您的意見,流將不會終止,直到它從下游運營商取消訂閱。 groupByUntil的持續時間選擇器將導致終止。如果文檔一旦關閉就不會再次打開,那麼您可以將「documentclosed」事件發送到流中,並使用常規groupBy和takeWhile測試「documentClosed」。
重新打開文檔不會再打開的原因是由於groupBy(在rx-js和rx.net中),如果已經看到的鍵再次出現,則不會創建新的組。
如果這是一個問題,那麼您將需要使用groupByUntil並使用發佈的流來監視documentClosed事件 - 使用已發佈的流將確保您不會收到訂閱副作用。
我會建議做:
,而不是僅僅有一個documentChanges觀察到,有documentEvents觀察到。
客戶端將發送documentOpened事件,當他們打開一個文檔,documentChanged事件時,他們改變文件和documentClosed事件時,他們關閉文檔。
通過發送所有3種類型的事件通過相同的可觀察,你建立並保證一個排序。如果一個客戶端發送了文檔已打開,documentChanged,documentClosed事件按照該順序,那麼您的服務器將按照該順序看到它們。請注意,對於由2個不同客戶端發送的事件的順序不會有任何保證。這將讓您確保特定客戶發送的事件順利進行。
然後,這是你如何使用groupByUntil
:
documentEvents
.groupByUntil(
function (e) { return e.documentId; }, // key
null, // element
function (group) { // duration selector
var documentId = group.key;
return group.filter(function (e) { return e.eventType === 'documentClosed'; });
})
.flatMap(function (eventsForDocument) {
var documentId = eventsForDocument.key;
return eventsForDocument.whatever(...);
})
.subscribe(...);
另一種選擇是簡單了很多:你可以終止該組的空閒週期之後。根據你對事件所做的事情,這可能綽綽有餘。如果文檔在5分鐘內未被編輯,則此示例會到期。如果有更多的編輯進來,那麼一個新的團隊即將啓動。
var idleTime = 5 * 60 * 1000;
events
.groupByUntil(
function(e) { return e.documentId; },
null,
function(g) { return g.debounce(idleTime); })
.flatMap...
- 1. 如何清空輸入流?
- 2. 遞歸空節點清理
- 3. 在Rails中清理Markdown嗎?
- 4. Angular2如何清空觀察到流
- 5. 流動演員能處理rtsp流嗎?
- 6. 流是理想的嗎?
- 7. 使用HTTP直播流精確清理
- 8. 騾流清理代碼方法
- 9. 關閉資源清理的Akka流
- 10. std :: getline()完全清空緩衝區嗎?
- 11. statusBarHidden:YES;但空的/清空的空間還在嗎?
- 12. 如何清理連續阿卡流中的子流
- 13. 我需要將參數清理到SQL存儲過程嗎?
- 14. 我應該將輸入清理爲參數化查詢嗎?
- 15. laravel清理空查詢變量
- 16. Word VBA宏清理空段並在
- 17. ReSharper的代碼清理和空行
- 18. XSLT - 遞歸空節點清理
- 19. 字符串空白清理不修剪
- 20. Windows RCP空閒連接清理
- 21. 如何清理NSString中的空格
- 22. 邏輯錯誤清理空表
- 23. 可以用cleanNode()來清理綁定嗎?
- 24. 這是清理StringBuilder的好方法嗎?
- 25. 建議您清理密碼輸入嗎?
- 26. WPF佈局,我可以清理它嗎?
- 27. 文件清理過程有問題嗎?
- 28. 我可以清理這個jQuery嗎?
- 29. pthread可以自行執行清理嗎?
- 30. 我正在做AS3參考清理嗎?
您最終需要讓客戶端發出*文檔關閉*事件。然後,您可以像James所描述的那樣使用'groupByUntil',並提供由文檔ID過濾的文檔關閉事件流作爲'durationSelector'子句。 – Brandon
謝謝布蘭登。對於我關於終止一個流的評論,當它正在處理中的事件時,我會喜歡你的想法。 –