2017-01-03 229 views
1

我二百十三分之二百十二網頁上執行從書Reactive Programming with RxJava一個例子:樣品不停止在完成事件

Observable<String> names = Observable 
     .just("Mary", "Patricia", "Linda", "Barbara", 
       "Elizabeth", "Jennifer", "Maria", "Susan", 
       "Margaret", "Dorothy"); 
Observable<Long> absoluteDelayMillis = Observable 
     .just(0.1, 0.6, 0.9, 1.1, 
       3.3, 3.4, 3.5, 3.6, 
       4.4, 4.8) 
     .map(d -> (long) (d * 1_000)); 
Observable<String> delayedNames = names 
     .zipWith(absoluteDelayMillis, 
       (n, d) -> Observable 
         .just(n) 
         .delay(d, TimeUnit.MILLISECONDS)) 
     .flatMap(o -> o); 
delayedNames 
     .sample(1, SECONDS) 
     .subscribe(System.out::println); 

當我運行的代碼,輸出爲:

Linda 
Barbara 
Susan 
Dorothy 

根據(也是我認爲的),Dorothy不應該在那裏,因爲sample()應該轉發完成事件@ 4.8s。

我正在與rxjava 1.1.6

我錯過了的例子嗎?

回答

-1

Sample是一種定時器,每隔一段時間滴答一次,從「緩衝區」中選取最後一項。

在你的情況,如果你將修改您的最後一個可觀察的:

long time = System.currentTimeMillis(); 
delayedNames 
     .doOnNext(n -> System.out.println(String.format("%s - %d", n, (System.currentTimeMillis() - time)))) 
     .sample(1, SECONDS) 
     .doOnCompleted(() -> System.out.println(String.format("complete - %d", (System.currentTimeMillis() - time)))) 
     .subscribe(System.out::println); 

你會看到輸出類似的東西:

Mary-155 
Patricia-657 
Linda-959 
Linda 
Barbara-1156 
Barbara 
Elizabeth-3355 
Jennifer-3460 
Maria-3558 
Susan-3658 
Susan 
Margaret-4460 
Dorothy-4856 
Dorothy 
complete - 4852 

所以讓我們一步一步來。

  • 第一次打勾發生在1000。正如您在輸出中看到的,緩衝區中的最後一項是Linda
  • 第二次滴答發生在2000。只有Barbara處於緩衝區中。打印它。
  • 第三次打勾發生在3000。沒有什麼是緩衝區。
  • Forth tick發生在4000Susan是緩衝區中的最後一個。打印。
  • 第五次打勾發生在5000Dorothy是最後一個緩衝區。打印。

UPD:

其實,有關於5000沒有打勾,而且似乎sample總會發出緩衝區最後一個項目。例如,如果你將修改源觀測:

Observable<String> names = Observable 
      .just("Mary", "Patricia", "Linda", "Barbara", 
        "Elizabeth", "Jennifer", "Maria", "Susan"); 
    Observable<Long> absoluteDelayMillis = Observable.just(0.1, 0.6, 0.9, 1.1, 3.3, 3.4, 3.5, 3.6) 

它會打印:

Mary - 153 
Patricia - 654 
Linda - 957 
Linda 
Barbara - 1157 
Barbara 
Elizabeth - 3358 
Jennifer - 3457 
Maria - 3559 
Susan - 3658 
Susan 
complete - 3659 

UPD2:

我創建bug report

UPD3:

我檢查過並且在rxjava2中它按預期工作。

+0

我也認爲這是一個錯誤,因爲根據[Observable.sample(Long,Timeunti)javadocs]中的mable圖表(http://www.atetric.com/atetric/javadoc/io.reactivex/rxjava/ 1.1.6/rx/Observable.html#sample-long-java.util.concurrent.TimeUnit-),最後一項不應該被髮射。 – TmTron

1

這實質上是在2.0版本中刪除了RxJava導致的一個錯誤。您對樣本操作員的理解是正確的。

1

爲了澄清,這不符合RxJava 1.x中的錯誤,因爲這是一個requested behavior早在2016年年初

然而,這RxJava 2.X內忽視,作爲2.0.4它doesn」發出最後一個緩衝項目,就像1.1.3版本一樣。

不幸的是,沒有解決方法,但有一個enhancement PR張貼,將允許選擇模式爲sample

+0

這是RxJava 1.1.6中的一個錯誤 - 只是它不在代碼中,而是在[Observable.sample(Long,Timeunti)]的java文檔中(http://www.atetric.com/atetric/)的Javadoc/io.reactivex/rxjava/1.1.6/RX/Observable.html#樣品長java.util.concurrent.TimeUnit-)。 另外這本書是錯誤的,因爲它提到所有的例子都應該與RxJava 1.1.6一起工作(除非另有說明 - 這個例子不是這種情況)。 – TmTron