2014-10-01 45 views
4

我正在從遠程URL讀取文件並使用RxJava報告下載進度。文件編寫器Observable發出一系列DownloadProgress對象。由於很多項目正在發射,我使用Observable.sample()來管理背壓。這非常有效 - UI更新以恆定的速率進行,並且沒有背壓問題,但最後的進度更新幾乎總是被忽略。如何在調用rx.Observable.sample()時接收最後的序列發射?

我想收到Observable序列中的最後一項,以便我可以用最終進度更新UI。確保始終發出Observable序列中最後一項的最佳方法是什麼?

Observable<Response> fileReader = 
     Rx.okHttpGetRequest(url); 
OkHttpResponseWriter fileWriter = 
     Rx.okHttpResponseWriter(outFile); 

Subscription subscription = fileReader.flatMap(fileWriter) 
     .sample(1, TimeUnit.SECONDS) 
     .subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(new Subscriber<DownloadProgress>() { 
      @Override 
      public void onCompleted() {} 
      @Override 
      public void onError(Throwable e) {} 
      @Override 
      public void onNext(DownloadProgress progress) { 
       // I want to receive an update every one second 
       // I also want to always receive the last progress update 
      } 
     }); 
+1

就處理得'onCompleted' ?如果我理解正確,您需要最後一次更新來處理完成的下載? – 2014-10-01 16:35:20

+1

'DownloadProgress'對象包含我想要向用戶公開的信息,包括從文件中讀取的總字節數。 'onCompleted'通知用戶序列已完成,但不會重新發送序列中的最後一個項目。假設從該值讀取數據至關重要。 – weefbellington 2014-10-01 19:04:17

回答

3

另外,使用buffer代替sample,將讓你發射的「額外「最後的人。我建議這樣做,這樣你就不會失去任何可組合性,也不會爲將來的開發人員(包括未來的你)帶來麻煩。

private Subscriber<List<Integer>> loggingSubscriber2 = new SimpleSubscriber<List<Integer>>() { 
    @Override 
    public void onNext(List<Integer> integers) { 
     Log.v(TAG, String.valueOf(integers.get(integers.size() - 1))); 
    } 
}; 

private void startObservableTests() { 
    Observable<Integer> fileObserver = Observable.create(
      new Observable.OnSubscribe<Integer>() { 
       @Override 
       public void call(Subscriber<? super Integer> subscriber) { 
        for (int i = 1; i <= 9; i++) { 
         subscriber.onNext(i); 
         try { 
          Thread.sleep(100); 
         } catch (InterruptedException e) { 
          e.printStackTrace(); 
         } 
        } 
        subscriber.onCompleted(); 
       } 
      }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); 

    fileObserver.buffer(500, TimeUnit.MILLISECONDS).subscribe(loggingSubscriber2); 
} 
+0

這是一個非常聰明的解決方案,我永遠不會想到。我發佈了一個使用'window'而不是'buffer'的fancier版本,但它基本上是相同的技術。 – weefbellington 2014-10-02 13:40:45

2

這是我結束了:

private void startObservableTests() { 

    Observable<Integer> fileObserver = Observable.create(
      new Observable.OnSubscribe<Integer>() { 
       @Override 
       public void call(Subscriber<? super Integer> subscriber) { 
        for (int i = 1; i <= 9; i++) { 
         subscriber.onNext(i); 
         try { 
          Thread.sleep(100); 
         } catch (InterruptedException e) { 
          e.printStackTrace(); 
         } 
        } 
        subscriber.onCompleted(); 
       } 
      }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); 


    fileObserver.sample(500, TimeUnit.MILLISECONDS).subscribe(new SuperSubscriber(fileObserver)); 
} 

private class SuperSubscriber extends Subscriber<Integer> { 

    Observable obs; 

    public SuperSubscriber(Observable<Integer> fileObserver) { 
     obs = fileObserver; 
    } 

    @Override 
    public void onCompleted() { 
     obs.last().subscribe(new SimpleSubscriber<Integer>() { 
      @Override 
      public void onNext(Integer o) { 
       Log.v(TAG, "final value was " + o); 
      } 
     }); 
    } 

    @Override 
    public void onError(Throwable e) { 

    } 

    @Override 
    public void onNext(Integer o) { 
     Log.v(TAG, "got a " + o); 
    } 
} 

這是輸出:

10-01 15:18:36.893 V/TAG (26129): got a 5 
10-01 15:18:38.214 V/TAG (26129): final value was 9 
1

以@Travis發佈的優秀解決方案爲基礎,以下是我最終的代碼。代替buffer,我組合使用windowswitchOnNext做一些進一步的處理:

Observable<Response> fileReader = 
    Rx.okHttpGetRequest(fileInfo.getUrl()); 
OkHttpResponseWriter fileWriter = 
    Rx.okHttpResponseWriter(outFile, totalBytesRead); 
Subscription subscription = Observable.switchOnNext(fileReader 
     .flatMap(fileWriter) 
     .window(1, TimeUnit.SECONDS)) 
     .subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(new Subscriber<DownloadProgress>() { 
      @Override 
      public void onCompleted() {} 
      @Override 
      public void onError(Throwable e) {} 
      @Override 
      public void onNext(DownloadProgress progress) { 
       // always emits the most recent DownloadProgress, even the last one! 
      } 
}); 

說明:

  • fileReader需要URL,創建一個HTTP請求併發出Response
  • fileReader.flatMap(fileWriter)Response並將字節流寫入磁盤,發出一系列DownloadProgress對象
  • window(1, TimeUnit.Seconds)發出List<DownloadProgress>對象每一秒,然後包裝並重新發出他們爲Observable<DownloadProgress>
  • Observable.switchOnNext()注意到最近發出Observable<DownloadProgress>併發出最後DownloadProgress對象序列中
+0

我得到一個MissingBackpressureException – user949884 2015-02-24 12:00:05

+0

我有什麼想法嗎? – user949884 2015-02-24 12:23:56

相關問題