5

我現在進入的情況很難解釋,所以我會寫一個更簡單的版本來解釋這個問題。用RxJava限制吞吐量

我有一個Observable.from()它發出文件的一個ArrayList定義的文件序列。所有這些文件都應該上傳到服務器。爲此,我有一個可以完成這項工作的功能並返回Observable

Observable<Response> uploadFile(File file); 

當我運行這段代碼它變得瘋狂,在Observable.from()發出的所有文件,並在上傳都在的,或至少是線程,它可以處理的最大值。

我想最多並行2個文件上傳。有沒有可以爲我處理這個問題的操作員?

我試圖緩衝窗口和其他一些人,但他們似乎只發出兩個項目一起,而不必不斷兩條平行的文件上傳。我也嘗試在上傳部分設置最大線程池,但這不能用於我的情況。

這個權利應該有一個簡單的操作符?我錯過了什麼嗎?

回答

4

我認爲所有文件都是並行上傳的,因爲您使用的是flatMap(),它同時執行所有轉換。相反,您應該使用concatMap(),它會一個接一個地運行一個轉換。要運行兩個並行上傳,您需要在您的文件上調用window(2) observable,然後像在代碼中那樣調用flatMap()

Observable<Response> responses = 
    files 
     .window(2) 
     .concatMap(windowFiles -> 
     windowFiles.flatMap(file -> uploadFile(file)); 
    ); 

UPDATE

我發現了一個更好的解決方案,這不正是你想要的東西。有flatMap()的超載接受最大併發線程數。

Observable<Response> responses = 
    files 
     .onBackpressureBuffer() 
     .flatMap(index -> { 
     return uploadFile(file).subscribeOn(Schedulers.io()); 
     }, 2); 
+0

這聽起來很完美!我會嘗試並讓你知道。 –

+0

不錯的窗口操作工作現在完美!我怎樣才能讓窗戶移動?現在,如果窗口發出文件1和文件2以供上傳,則等待兩者都完成。如果文件2完成且1仍在進行中,是否可以執行文件3的上載? –

+0

我不確定使用默認的操作符是可能的,所以您可能需要編寫自己的操作符。 – Michael