1

在一個Node應用程序中,我試圖用RxJS來處理事件流。事件流是許多文檔更改的列表。我使用groupBy通過documentId將流分割成新的流。但是我想知道,一旦文檔在客戶端關閉,並且沒有新事件添加到該文檔標識的流中,groupBy會在該文檔流爲空時處理該文檔的流?如果不是,我會如何手動做到這一點?我想避免由正在創建但未被銷燬的新文檔流引起的內存泄漏。將Rx.Observable.groupBy清理空流嗎?

+0

您最終需要讓客戶端發出*文檔關閉*事件。然後,您可以像James所描述的那樣使用'groupByUntil',並提供由文檔ID過濾的文檔關閉事件流作爲'durationSelector'子句。 – Brandon

+0

謝謝布蘭登。對於我關於終止一個流的評論,當它正在處理中的事件時,我會喜歡你的想法。 –

回答

1

既然你包括.NET標籤,我將介紹Rx.NET爲好。

你的問題是措辭有點不正確。當且僅當它們從未有事件時,流纔是空的。所以,他們不能成爲空。不發射數據的流通常不會消耗太多的資源。

在.NET中,集團將不會終止,直到源終止。我們使用'GroupByUntil`,它允許你爲每個組指定一個durationSelector流。 Observable.Timer經常適用於此。

這意味着,你可能會得到多個非併發流使用相同的密鑰出現一段時間,但如果(這是常有的情況),您的組流在某一點扁平,都不會有問題。

在rxjs,我們也有groupByUntil。

在Rx-Java中,其行爲相似的groupByUntil方法分爲groupBy-有關更多詳細信息,請參閱https://github.com/ReactiveX/RxJava/pull/1727https://github.com/benjchristensen/RxJava/commit/b9302956832e3e77579f63fd9db25aa60eb4192a

http://reactivex.io/documentation/operators/groupby.html說:

如果從GroupedObservables之一退訂,那GroupedObservable將被終止。如果源Observable稍後發出一個項,該項的鍵匹配以這種方式終止的GroupedObservable,則groupBy將創建併發出一個新的GroupedObservable以匹配該鍵。

因此,在Rx-Java中,您必須從分組的可觀察流中取消訂閱才能終止它。 takeUntiltimer流可以爲此工作。

附錄:

在回答您的意見,流將不會終止,直到它從下游運營商取消訂閱。 groupByUntil的持續時間選擇器將導致終止。如果文檔一旦關閉就不會再次打開,那麼您可以將「documentclosed」事件發送到流中,並使用常規groupBy和takeWhile測試「documentClosed」。

重新打開文檔不會再打開的原因是由於groupBy(在rx-js和rx.net中),如果已經看到的鍵再次出現,則不會創建新的組。

如果這是一個問題,那麼您將需要使用groupByUntil並使用發佈的流來監視documentClosed事件 - 使用已發佈的流將確保您不會收到訂閱副作用。

+0

OP正在使用'RxJs'(* node *)。他錯誤地標記了他的問題 – Brandon

+0

啊是的,爲此添加了一行...... –

+1

:)我真的指的是您的Rx-Java建議,這對RxJs沒有幫助,因爲RxJs像RX.NET一樣工作 – Brandon

2

我會建議做:

,而不是僅僅有一個documentChanges觀察到,有documentEvents觀察到。

客戶端將發送documentOpened事件,當他們打開一個文檔,documentChanged事件時,他們改變文件和documentClosed事件時,他們關閉文檔。

通過發送所有3種類型的事件通過相同的可觀察,你建立並保證一個排序。如果一個客戶端發送了文檔已打開,documentChanged,documentClosed事件按照該順序,那麼您的服務器將按照該順序看到它們。請注意,對於由2個不同客戶端發送的事件的順序不會有任何保證。這將讓您確保特定客戶發送的事件順利進行。

然後,這是你如何使用groupByUntil

documentEvents 
    .groupByUntil(
     function (e) { return e.documentId; }, // key 
     null, // element 
     function (group) { // duration selector 
      var documentId = group.key; 
      return group.filter(function (e) { return e.eventType === 'documentClosed'; }); 
     }) 
    .flatMap(function (eventsForDocument) { 
     var documentId = eventsForDocument.key; 
     return eventsForDocument.whatever(...); 
    }) 
    .subscribe(...); 

另一種選擇是簡單了很多:你可以終止該組的空閒週期之後。根據你對事件所做的事情,這可能綽綽有餘。如果文檔在5分鐘內未被編輯,則此示例會到期。如果有更多的編輯進來,那麼一個新的團隊即將啓動。

var idleTime = 5 * 60 * 1000; 
events 
    .groupByUntil(
     function(e) { return e.documentId; }, 
     null, 
     function(g) { return g.debounce(idleTime); }) 
    .flatMap...