2017-10-08 112 views
0

我有一個整數列表和每個整數,我想從數據庫中獲取一個字符串值,並將其放入一個Map中,其中整數是來自父列表的整型值,並且字符串是從數據庫中獲取的字符串值。我也想要獲取並行運行的字符串。RxJava:循環中的異步調用

這是我用RxJava實現的。我的代碼工作,我得到了預期的結果,但我不認爲提取名稱並行運行。

public Observable<Map<Integer, String>> getVitalsForConnectedDevices(int accountId) { 
    List<Integer> ids = Lists.newArrayList(2, 3, 5); 

    return Observable<Map<Integer, String>> obs = Observable.from(ids) 
      .flatMap((Func1<Integer, Observable<Map.Entry<Integer, String>>>) integer 
        -> Observable.just(Maps.immutableEntry(integer, deviceDAO.getVitalName(integer))) 
        .subscribeOn(Schedulers.io()), 20) 
      .toMap(entry -> entry.getKey(), entry -> entry.getValue()); 
} 

這裏的getVitalName()方法

public String getVitalName(int vitalId) { 
    log.debug("id: " + vitalId); 
    String query = "SELECT name FROM vitals WHERE vital_id=?"; 
    String name = v1JdbcTemplate.queryForObject(query, String.class, vitalId); 
    log.debug("name: " + name); 
    return name; 
} 

其打印從上述功能的調試語句順序如下:

09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 2 
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Steps 
09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 3 
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Floors 
09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 5 
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Distance 

如果它運行的同時,應該不會吧打印所有的ID的第一個名字?我在這裏做錯了什麼?我如何讓他們平行運行?

+2

我覺得這篇文章會給你提供你需要的答案。即使這個例子也很相似:http://tomstechnicalblog.blogspot.com.by/2015/11/rxjava-achieving-parallelization.html –

回答

1

應該不是被打印所有ID的第一和名稱後

答案是否定的,因爲你所要求的,每個ID從數據庫字符串。結果完成後,將ID和字符串打包到一個條目中,然後將其壓入管道。請注意,如果所有flatMap完成或發生錯誤,toMap將只能成功完成。所以可能會,你會等待最後的結果一段時間。

作爲一個例子:你迭代5個ID。所有請求都是並行觸發的。服務器由於負載而處於困境中,所以一些請求需要一些時間。可以說所有的請求都完成了,但只有一個。只有最後一個請求完成後,結果地圖<>纔會被推送給用戶。如果這是你想要的,那麼我會建議返回一個單一的<>不是可觀察的。

這裏是輸出測試:

@Test 
void name() throws Exception { 
    Observable<Tuple2<Integer, String>> tuple2Observable = Observable.just(1, 2, 3, 4, 5, 6) 
      .flatMap(integer -> 
        Observable.fromCallable(() -> getVitalName(integer)) 
          .subscribeOn(Schedulers.io()) 
          .doOnNext(s -> System.out.println("Value:: " + Thread.currentThread().getName() + "-" + Instant.now())) 
          .map(s -> Tuple.of(integer, s)) 
      ).doOnComplete(() -> System.out.println("Finished:: " + Thread.currentThread().getName() + "-" + Instant.now())); 

    tuple2Observable.test() 
      .await(); 
} 

public String getVitalName(int vitalId) throws Exception { 
    System.out.println("getVitalName method called with vitalId = " + vitalId + "-" + Thread.currentThread().getName() + "-" + Instant.now()); 

    Thread.sleep(500); 

    String name = "le fake value"; 
    return name; 
} 

你看,那確實是卡列斯分別在不同的線程,當所有請求已經完成了觀察的同時成品製造。

getVitalName method called with vitalId = 4-RxCachedThreadScheduler-4-2017-10-08T21:20:43.785Z 
getVitalName method called with vitalId = 6-RxCachedThreadScheduler-6-2017-10-08T21:20:43.786Z 
getVitalName method called with vitalId = 5-RxCachedThreadScheduler-5-2017-10-08T21:20:43.785Z 
getVitalName method called with vitalId = 1-RxCachedThreadScheduler-1-2017-10-08T21:20:43.785Z 
getVitalName method called with vitalId = 2-RxCachedThreadScheduler-2-2017-10-08T21:20:43.784Z 
getVitalName method called with vitalId = 3-RxCachedThreadScheduler-3-2017-10-08T21:20:43.787Z 
Value:: RxCachedThreadScheduler-4-2017-10-08T21:20:44.303Z 
Value:: RxCachedThreadScheduler-6-2017-10-08T21:20:44.303Z 
Value:: RxCachedThreadScheduler-1-2017-10-08T21:20:44.304Z 
Value:: RxCachedThreadScheduler-2-2017-10-08T21:20:44.304Z 
Value:: RxCachedThreadScheduler-3-2017-10-08T21:20:44.304Z 
Value:: RxCachedThreadScheduler-5-2017-10-08T21:20:44.304Z 
Finished:: RxCachedThreadScheduler-4-2017-10-08T21:20:44.317Z