2017-03-14 174 views
0

我正在嘗試使用Rx Java進行並行化。我按照以下方式調用5個並行使用Rx Java的方法。RxJava並行執行問題

public Map findData(param1,param2){ 

       Observable<List<DataDTO>> data1 = 
          Observable.create(s->{ 
            try { 
             s.onNext(method1(param1,param2)); 
            } 
            catch(Exception e) { 
             System.out.println("Exception happened while calculating Data"); 
             s.onError(e); 
            } 

         }); 

         Observable<List<DataDTO>> data2 = 
           Observable.create(s->{ 
             try { 
              s.onNext(method2(param1,param2)); 
             } 
             catch(Exception e) { 
              System.out.println("Exception happened while calculating Data"); 
              s.onError(e); 
             } 
          }); 


         Observable<List<DataDTO>> data3 = 
           Observable.create(s->{ 
             try { 
              s.onNext(method3(param1,param2)); 
             } 
             catch(Exception e) { 
              System.out.println("Exception happened while calculating Data"); 
              s.onError(e); 
             } 
          }); 

         Observable<List<DataDTO>> data4 = 
           Observable.create(s->{ 
             try { 
              s.onNext(method3(param1,param2)); 
             } 
             catch(Exception e) { 
              System.out.println("Exception happened while calculating Data"); 
              s.onError(e); 
             } 
          }); 

         Observable<List<DataDTO>> data5 = 
           Observable.create(s->{ 
             try { 
              s.onNext(method4(param1,param2)); 
             } 
             catch(Exception e) { 
              System.out.println("Exception happened while calculating Data"); 
              s.onError(e); 
             } 
          }); 

      Observable<List<DataDTO>> mergedBean = Observable.merge(
         data1.subscribeOn(Schedulers.io()), 
        data2.subscribeOn(Schedulers.io()), 
        data3.subscribeOn(Schedulers.io()), 
        data4.subscribeOn(Schedulers.io())); 

     Observable<List<DataDTO>> mergedBean1 = Observable.merge(mergedBean,data5.subscribeOn(Schedulers.io())); 

    mergedBean1.subscribe(s->combineDataMethod(s,beanMap), 
         e -> {throw new BusinessException(e);}); 

    System.out.println("Returning Map created beanMap="+beanMap); 

     return beanMap; 
    } 

所有的方法都是並行執行的,我正在用組合的數據獲得beanMap。但問題是主線程在所有其他並行線程完成之前將beanMap返回給調用方方法。所以它返回一個空的地圖給調用者。在這裏實施並行呼叫的正確方式是什麼?

+0

你抱怨的主線程,但不顯示它運行的代碼。你認爲我們可以幫助你嗎? –

+0

嗨Alexei,我從一個休息服務類(JAX-RS)調用findData(param1,param2)方法,我需要操作從findData方法返回的服務中的數據。另外,可觀察對象中的每個方法都是數據庫查找。我試圖從DB平行地查找數據並嘗試返回到我的服務類。我正在Tomcat容器中運行它。 – Raj

回答

1

您將需要轉換的ObservableBlockingObservable

mergedBean1.toBlocking().subscribe(...); 

這將等到源可觀察完成。但是,爲什麼不簡單地返回Observable<Map>?這樣你不需要明確地等待,並且可以將其與其他並行操作組合。

此外,你的代碼應該是下面或多或少相當於:

Observable.just(
    Observable.defer(() -> Observable.just(method1(param1,param2))), 
    Observable.defer(() -> Observable.just(method1(param1,param2))), 
    Observable.defer(() -> Observable.just(method1(param1,param2))), 
    Observable.defer(() -> Observable.just(method1(param1,param2))), 
    Observable.defer(() -> Observable.just(method1(param1,param2))), 
    Observable.defer(() -> Observable.just(method1(param1,param2))) 
) 
.map(o -> o.subscribeOn(Schedulers.io())) 
.compose(Observable::merge) 
.doOnNext(s -> combineDataMethod(s,beanMap)) 
.doOnError(e -> System.out.println("Exception happened while calculating Data")) 
.onErrorResumeNext(e -> Observable.error(new BusinessException(e))) 
.toBlocking() 
.subscribe(); 
+0

感謝Tassos,讓我試試這個 – Raj

+0

嗨@Tassos我收到一個錯誤,說「方法toBlocking()是未定義的類型Observable <列表>」。 – Raj

+0

感謝@Tassos它的工作。我使用了blockingSubscribe() – Raj