2016-10-28 22 views
0

我是ReactiveX的一名相對新手,已經學習了Rx.Observable.takeRx.Observable.takeLast分別從一個序列的開頭和結尾開始,並從Rx.Observable.windowWithCount獲取潛在的重疊窗口一個原始的可觀察的。爲了好玩,我想完全使用反應式運算符和換能器編寫FFT算法。一些算法很直觀,但有些很難用流模型。具體來說,一個rfft作用於一個序列的開始和結束值,我不知道該怎麼做。更具體地講,如果我有:reactivex:在一種窗口函數中採取take和take的組合

[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] 

它會被分解成可觀察窗口:

[[0,1,14,15],[2,3,12,13],[4,5,10,11],[6,7,8,9]] 

有一種優雅的方式來對任意觀察到的序列做到這一點?

回答

0

我不得不說,我不認爲這是一個好主意,使用反應有限的流,因爲你沒有任何事件反應或任何背壓。你必須知道流的長度並且它是有限的。最好的解決方案是使用o(1)的數組。儘管如此,這是一個可能的解決方案,它將使用許多cpu週期。我使用RxJava2-RC5進行測試。

@Test 
public void ttfAlgo() throws Exception { 
    Observable<Integer> ascendingOrdered = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 
      .concatWith(Observable.just(11, 12, 13, 14, 15)); 

    Observable<Integer> descendingOrdered = ascendingOrdered.sorted((t1, t2) -> { 
     return t2.compareTo(t1); 
    }); 

    Observable.zip(ascendingOrdered.window(2), descendingOrdered.window(2), (w1, w2) -> { 
     return w1.concatWith(w2.sorted()); 
    }).flatMap(window -> { 
     System.out.println("windowX"); 
     return window; 
    }).subscribe(integer -> { 
     System.out.println(integer); 
    }); 

    Thread.sleep(1_000); 
} 
+0

非常感謝!其中一部分是確定反應x的合適用途 - 看起來這不太合適。這個想法是在流式音頻上做fft,所以全部流並不是有限的,但是其流向fft的流是。 – mikesol

+0

是的,如果流是基於推送的,那麼您可能需要在rx派上用場的背壓。你可以用它來處理流緩衝區中的一些字節,並在緩衝窗口上並行啓動一些本地配對實現。 –