2016-11-17 51 views
0

我有api從服務器下載單個mp3文件,這是使用RxJava作爲波紋管消耗的。RxAndroid下載多個文件,最多3個併發線程

Observable<ResponseBody> observable = audioService.getFile(fileNameWithExtension); 
     observable.subscribeOn(Schedulers.newThread()) 
       .observeOn(Schedulers.newThread()) 
       .subscribe(someCallBackClass<ResponseBody>); 

這只是下載單個文件,回調將文件保存在磁盤上。 我想下載文件列表,將每個文件保存在磁盤上,並等待所有下載完成,最多3個調用應該並行執行。 如何用RXAndroid做到這一點,我嘗試了flatmap,但我無法完全理解它。

編輯新代碼

List<Observable<Response<ResponseBody>>> audioFiles = new ArrayList<>(); 

    for (String fileNameWithExtension : fileNamesWithExtension) { 
     Observable<Response<ResponseBody>> observable = restFactory.getAudioService().getFile(fileNameWithExtension); 
     audioFiles.add(observable); 
    } 

    Observable.from(audioFiles).flatMap(audioFile -> Observable.fromCallable(() -> { 
     audioFile.subscribeOn(Schedulers.io()) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .toBlocking() 
       .subscribe(new CallBackWithErrorHandling<>(Downloader.this)); 
     return 0; 
    }).subscribeOn(Schedulers.io()), MAX_CONCURRENT) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Subscriber<Integer>() { 
       @Override 
       public void onCompleted() { 
        goToMainActivity(); 
       } 

       @Override 
       public void onError(Throwable e) { 
        Log.e(TAG, "Something went wrong , " + Thread.currentThread().getName()); 
        Log.e(TAG, "Something went wrong , " + e.toString()); 
        showToast(R.string.something_went_wrong); 
        goToMainActivity(); 
       } 

       @Override 
       public void onNext(Integer integer) { 
       } 
      }); 

,這是工作的罰款,但是當網絡出現故障或互聯網連接速度較慢我得到

java.lang.RuntimeException: Can't create handler inside thread that has not called Looper.prepare() 

我無法瞭解哪些線正好需要observeOn() android主線程。

回答

2

您可以flatMap實現這一目標,限制了它的併發性,還需要在後臺調度,做文件傳輸內可觀測運行:

fileNames 
.flatMap(name -> { 
     return Observable.fromCallable(() -> { 
      // put your blocking download code here, save the data 
      return name; // return what you need down below 
     }) 
     .subscribeOn(Schedulers.io()); 
}, 3) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(completedFile -> { }, error -> { }, 
    () -> { /* all completed.*/ }); 

編輯:

既然你使用Observable API進行網絡下載,您無需阻止:

Observable.from(audioFiles) 
.flatMap(audioFile -> 
    audioFile.subscribeOn(Schedulers.io()), // <-- apply extra transforms here 
    MAX_CONCURRENT) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(completedFile -> { }, error -> { }, 
    () -> { /* all completed.*/ }) 

目前尚不清楚儘管你用CallBackWithErrorHandling做了什麼。