我正在嘗試使用Rx Java進行並行化。我按照以下方式調用5個並行使用Rx Java的方法。RxJava並行執行問題
public Map findData(param1,param2){
Observable<List<DataDTO>> data1 =
Observable.create(s->{
try {
s.onNext(method1(param1,param2));
}
catch(Exception e) {
System.out.println("Exception happened while calculating Data");
s.onError(e);
}
});
Observable<List<DataDTO>> data2 =
Observable.create(s->{
try {
s.onNext(method2(param1,param2));
}
catch(Exception e) {
System.out.println("Exception happened while calculating Data");
s.onError(e);
}
});
Observable<List<DataDTO>> data3 =
Observable.create(s->{
try {
s.onNext(method3(param1,param2));
}
catch(Exception e) {
System.out.println("Exception happened while calculating Data");
s.onError(e);
}
});
Observable<List<DataDTO>> data4 =
Observable.create(s->{
try {
s.onNext(method3(param1,param2));
}
catch(Exception e) {
System.out.println("Exception happened while calculating Data");
s.onError(e);
}
});
Observable<List<DataDTO>> data5 =
Observable.create(s->{
try {
s.onNext(method4(param1,param2));
}
catch(Exception e) {
System.out.println("Exception happened while calculating Data");
s.onError(e);
}
});
Observable<List<DataDTO>> mergedBean = Observable.merge(
data1.subscribeOn(Schedulers.io()),
data2.subscribeOn(Schedulers.io()),
data3.subscribeOn(Schedulers.io()),
data4.subscribeOn(Schedulers.io()));
Observable<List<DataDTO>> mergedBean1 = Observable.merge(mergedBean,data5.subscribeOn(Schedulers.io()));
mergedBean1.subscribe(s->combineDataMethod(s,beanMap),
e -> {throw new BusinessException(e);});
System.out.println("Returning Map created beanMap="+beanMap);
return beanMap;
}
所有的方法都是並行執行的,我正在用組合的數據獲得beanMap。但問題是主線程在所有其他並行線程完成之前將beanMap返回給調用方方法。所以它返回一個空的地圖給調用者。在這裏實施並行呼叫的正確方式是什麼?
你抱怨的主線程,但不顯示它運行的代碼。你認爲我們可以幫助你嗎? –
嗨Alexei,我從一個休息服務類(JAX-RS)調用findData(param1,param2)方法,我需要操作從findData方法返回的服務中的數據。另外,可觀察對象中的每個方法都是數據庫查找。我試圖從DB平行地查找數據並嘗試返回到我的服務類。我正在Tomcat容器中運行它。 – Raj