2016-07-06 13 views
4

我有一個模擬長時間運行過程的睡眠方法。如何讓這個rxjava zip並行運行?

private void sleep() { 
    try { 
     Thread.sleep(2000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

然後我有一個方法返回一個Observable,其中包含參數中給出的2個字符串的列表。它在返回字符串之前調用睡眠。

private Observable<List<String>> getStrings(final String str1, final String str2) { 
    return Observable.fromCallable(new Callable<List<String>>() { 
     @Override 
     public List<String> call() { 
      sleep(); 
      List<String> strings = new ArrayList<>(); 
      strings.add(str1); 
      strings.add(str2); 
      return strings; 
     } 
    }); 
} 

然後我打電話的getStrings在Observalb.zip三次,我期望這三個電話並行運行,因此執行的總時間應2秒或可能在3秒內大部分是因爲睡眠只有2秒。但是,它總共需要六個秒。 如何讓它平行運行,以便在2秒內完成?

Observable 
.zip(getStrings("One", "Two"), getStrings("Three", "Four"), getStrings("Five", "Six"), mergeStringLists()) 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(new Observer<List<String>>() { 
    @Override 
    public void onCompleted() { 

    } 

    @Override 
    public void onError(Throwable e) { 

    } 

    @Override 
    public void onNext(List<String> strings) { 
     //Display the strings 
    } 
}); 

的mergeStringLists方法

private Func3<List<String>, List<String>, List<String>, List<String>> mergeStringLists() { 
    return new Func3<List<String>, List<String>, List<String>, List<String>>() { 
     @Override 
     public List<String> call(List<String> strings, List<String> strings2, List<String> strings3) { 
      Log.d(TAG, "..."); 

      for (String s : strings2) { 
       strings.add(s); 
      } 

      for (String s : strings3) { 
       strings.add(s); 
      } 

      return strings; 
     } 
    }; 
} 

回答

7

這是發生因爲訂閱您zipped觀察到的發生在同一個,io線程。

爲什麼不試試這個:

Observable 
    .zip(
     getStrings("One", "Two") 
      .subscribeOn(Schedulers.newThread()), 
     getStrings("Three", "Four") 
      .subscribeOn(Schedulers.newThread()), 
     getStrings("Five", "Six") 
      .subscribeOn(Schedulers.newThread()), 
     mergeStringLists()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(new Observer<List<String>>() { 
     @Override 
     public void onCompleted() { 

     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onNext(List<String> strings) { 
      //Display the strings 
     } 
    }); 

讓我知道這是否有助於

+1

'Schedulers.io()'是默認情況下是根據需要增長的線程池。 –

+0

@TassosBassoukos你的意思是Schedulers.io()會根據需要自動創建新線程? –

+0

@Bartek你的解決方案工作,你知道是否有任何其他解決方案比你的解壓縮並行運行? –

1

這裏我有,我沒有使用異步雙向拉鍊,以防萬一you're爲例好奇

  /** 
      * Since every observable into the zip is created to     subscribeOn a diferent thread, it´s means all of them will run in parallel. 
      * By default Rx is not async, only if you explicitly use subscribeOn. 
       */ 
      @Test 
      public void testAsyncZip() { 
       scheduler = Schedulers.newThread(); 
       scheduler1 = Schedulers.newThread(); 
       scheduler2 = Schedulers.newThread(); 
       long start = System.currentTimeMillis(); 
       Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2) 
                        .concat(s3)) 
      .subscribe(result -> showResult("Async in:", start, result)); 
      } 



     public Observable<String> obAsyncString() { 
return Observable.just("") 
       .observeOn(scheduler) 
       .doOnNext(val -> { 
        System.out.println("Thread " + Thread.currentThread() 
                  .getName()); 
       }) 
       .map(val -> "Hello"); 
     } 

     public Observable<String> obAsyncString1() { 
return Observable.just("") 
       .observeOn(scheduler1) 
       .doOnNext(val -> { 
        System.out.println("Thread " + Thread.currentThread() 
                  .getName()); 
       }) 
       .map(val -> " World"); 
     } 

     public Observable<String> obAsyncString2() { 
return Observable.just("") 
       .observeOn(scheduler2) 
       .doOnNext(val -> { 
        System.out.println("Thread " + Thread.currentThread() 
                  .getName()); 
       }) 
       .map(val -> "!"); 
     } 

您可以在這裏看到更多的例子https://github.com/politrons/reactive

+0

請格式化代碼 –