2015-12-09 80 views
1

這是一個初學者的問題。我將幾個進程的輸出回送到客戶端,並且我想將每個流存儲到單獨的文件中。重建嵌套的Observables

所以我想是(大字母表示嵌套流的末尾)

S: -a-a-a-b-b-a-A-c-b-B-c-c-...-d--C...D-e--...-z-E-z--z--... 

R: a-a-a-----a-A (complete) 
      b-b-------b-B (complete) 
        c-----c-c--------C (complete) 
           d------D (complete) 
             e--------E (complete) 

        . 
        . 
        . 
     (end many more nested streams coming) 
        . 

所以我想是觀測量的動態工廠。類似於using(),但據我所知using()創建Observables,只要原始Observable存在,而我想在每次嵌套流完成時完成並關閉文件。

重要 - 我不想在內存中緩衝,因爲這些是非常長的流(輸出長時間運行的進程)。所以我想避免buffer(),groupBy()

回答

1

如果你可以從一個值確定它是否是最後一個特定的字母,您可以在內部羣體使用groupBytakeUntil(您可能需要在lambda表達式的幾個試漁獲物):

source 
.groupBy(v -> v.type) 
.flatMap(g -> 
    Observable.using(
     () -> createOutput(g.getKey()), 
     f -> g.takeUntil(v -> v.isEnd).doOnNext(v -> f.write(v))), 
     f -> f.close() 
    ) 
) 
.subscribe(...) 

TakeUntil確保groupBy只保留子流,只要我們期望它的值(如果源已正確排序,則groupBy不會重新創建組)。

如果你真的想避免groupBy,你必須手動跟蹤每個打開的文件,並在適當的時候關閉它們:

Observable.using(
    HashMap::new, 
    map -> source 
     .doOnNext(v -> { 
      Output o = map.get(v.type); 
      if (o == null) { 
       o = new Output(v.type); 
       map.put(v.type, o); 
      } 
      o.write(v); 
      if (v.isEnd) { 
       o.close(); 
       map.remove(v.type); 
      } 
     }), 
    map -> map.values().forEach(e -> e.close()) 
) 
.subscribe(...); 
+0

感謝您的快速回復。我在看第二部分(因爲我真的不想使用'groupBy()')它看起來像工作,但這似乎不是一個反應性解決方案。它只是一個監聽器,它根據其私有hashmap調度數據。所以這是一個標準的必要途徑。有沒有辦法以純粹的被動方式解決問題? –

+0

選項#1更具反應性,如果您可以在反應性文件庫中保留,則可以在保存時完全異步。否則,如果能夠更有效地解決您的問題,請不要混淆這兩個概念。 – akarnokd

+0

這不是恐懼。簡單地說,我想學習如何編寫適當的反應式代碼。和我在編寫Java代碼時不想內聯彙編程序一樣(不知道這是否可能),我不想將非反應性技術注入到反應代碼中。選項#1是毫無疑問的反應,但它將數據積累在內存中。這是我不能做的事情,正如原來的問題已經解釋過的那樣。 –