2016-11-07 41 views
1

我使用rxjava 1.2.2。rxjava從緩衝區中檢索最大值

從我的List開始,我想填充一個緩衝區,然後按照例如每5秒鐘過濾緩衝區的Max項目的方式,只需要發送Max Item過濾器。

Observable<Item> EventEmitter = Observable.from(itemsList); 

Observable<List<Item>> tapBufferEmitter = tapEventEmitter.buffer(5, TimeUnit.SECONDS); 

MathObservable.from(tapBufferEmitter).max(new Comparator<List<Item>>() { 

      @Override 
      public int compare(List<Item> o1, List<Item> o2) { 
       int m1 =o1.getVal(); 
       int m2 = o1.getVal(); 
       if (m1 == m2){ 
        return 0; 
       } else if (m1 > m2){ 
        return 1; 
       } else { 
        return -1; 
       }       
      } 
     }).subscribeOn(Schedulers.from(executor1)) 
     .subscribe(s -> { 
      System.out.println("Called thread: " + Thread.currentThread().getId()); 

      syso.("Max Item is:" + s.getId()); 
     }, e -> System.out.println(e.getMessage())); 

但是,上面的代碼段當然不起作用。我不想比較2列表o1和o2,但我只想比較同一列表的項目。

最大運營商是否是正確的選擇?請注意,我沒有比較整數,但項目。每個項目是一個固定領域的珠子。我想要這個字段的最大值的那個。

如何從緩衝區中選擇最大值? 謝謝

+0

你有沒有也要導入庫文件RxJavaMath,因爲MathObservable不在RxJava包中。 –

+0

當然是......但我想知道運營商的正確拼接。 – Alex

+0

我編輯了這個問題......你能再讀一遍嗎? – Alex

回答

1

我寫了一個例子,說明如何使用MathObservable.max運算符。請注意,我確實使用了窗口而不是緩衝區,因爲緩衝區將返回一個List,並且窗口會給我一個Observable,我可以在flatMap和MathObservablen中重用它。 MathObservable然後將計算給定窗口的最大值(可觀察到5個元素)。

搖籃:

compile 'io.reactivex:rxjava:1.2.1' 
compile 'io.reactivex:rxjava-math:1.0.0' 

窗口:

@Test 
public void windowMaxTest() throws Exception { 
    Observable<Integer> just = Observable.just(10, 9, 8, 4, 7, 5, 6, 8, 4, 3); 

    Observable<Integer> integerObservable1 = just.window(5) 
      .flatMap(integerObservable -> { 
       return MathObservable.max(integerObservable); 
      }); 

    TestSubscriber<Integer> testSubscriber = new TestSubscriber<>(); 

    integerObservable1.subscribe(testSubscriber); 

    testSubscriber.awaitTerminalEvent(); 
    testSubscriber.assertValues(10, 8); 
} 

緩衝區:

@Test 
public void bufferMaxTest() throws Exception { 
    Observable<Integer> just = Observable.just(10, 9, 8, 4, 7, 5, 6, 8, 4, 3); 

    Observable<Integer> integerObservable1 = just.buffer(5) 
      .flatMap(integerObservable -> { 
       return MathObservable.max(Observable.from(integerObservable)); 
      }); 

    TestSubscriber<Integer> testSubscriber = new TestSubscriber<>(); 

    integerObservable1.subscribe(testSubscriber); 

    testSubscriber.awaitTerminalEvent(); 
    testSubscriber.assertValues(10, 8); 
} 

自定義對象::

class Item { 
    public int value; 

    public Item(int value) { 
     this.value = value; 
    } 
} 

@Test 
public void test3214() throws Exception { 
    final Item max1 = new Item(3); 
    final Item max2 = new Item(6); 
    final List<Item> myListOfItem = Arrays.asList(new Item(1), new Item(2), max1, new Item(4), new Item(5), max2); 

    Observable<Item> itemObservable1 = Observable 
      .from(myListOfItem) 
      .buffer(3) 
      .flatMap(itemObservable -> { 
       Observable<Item> from = Observable.from(itemObservable); 

       return MathObservable.from(from) 
         .max((item, t1) -> { 
          return Integer.compare(item.value, t1.value); 
         }); 
      }); 

    TestSubscriber<Item> testSubscriber = new TestSubscriber<>(); 

    itemObservable1.subscribe(testSubscriber); 

    testSubscriber.awaitTerminalEvent(); 

    testSubscriber.assertValues(max1, max2); 
} 
+0

我如何將它與緩衝區集成?使用flatmap運算符? – Alex

+0

我添加了緩衝區測試到我的答案。 –

+0

再次對不起...我對rx很新,我的輸入是列表,所以我不能使用正義運算符。 Cna你使用起始列表更新前面的例子? – Alex

0

一般來說,。降低()是獲得最大/最小/平均/總和等

reduce(0, /*return Math.max(lhs, rhs)*/) 

因此整個操作將是這樣的標準選擇 -

source.window(/*...*/).flatMap(/*return .reduce(0, /*return Math.max(lhs, rhs)*/)) 
+0

這是怎麼回事? .flatMap(/ * return .reduce(0,/ * return Math.max(lhs,rhs)* /))i mean mean return .reduce .... – Alex

+0

from original Observable 您使用.window創建Observable > ()運算符,然後調用.flatMap()和_inside_ .flatMap()運算符返回.flatMap()的參數 –

+0

上的.reduce(0,Math.max()),但如果我的項目有字段,我必須發出具有item.getIndex()的最大值的項目? – Alex