2016-09-16 51 views
3

我試圖調用存儲過程3次。當我在代碼下運行時,最後一次調用存儲過程的數據只顯示在resultSet.getRows()之內。前兩次調用存儲過程的數據不會出現在resultSet中。以後是我的代碼。難道我做錯了什麼。誰能幫忙?數據沒有得到聚合在java rxObservable的resultSet中

String currentPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName")); 
String priorPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName")); 
String todayPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName")); 
JsonArray jsonArray = new JsonArray(); 

     database.dbObject().getConnectionObservable().subscribe(
       connection -> { 
        Observable<ResultSet> resultSetObservable = connection.callWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")),jsonArray).                    
          flatMap(result -> connection.callWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")), jsonArray). 
          flatMap(result -> connection.callWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")),jsonArray); 

        resultSetObservable.subscribe(resultSet -> { 
         handler.handle(ReportUtils.parseSQLResult(resultSet.getRows())); 
        },error -> { 
         error.printStackTrace(); 
        },connection::close); 

       },err -> { 
        err.printStackTrace(); 
       } 
     ); 

回答

1

什麼你正在嘗試做的可以用這給所有的觀測結果combineLatest操作來實現(它會等待所有的觀測值給結果)

參考http://reactivex.io/documentation/operators/combinelatest.html

僞代碼爲

 String currentPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName")); 
     String priorPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName")); 
     String todayPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName")); 
    JsonArray jsonArray = new JsonArray(); 


    database.dbObject().getConnectionObservable().subscribe(
      connection -> { 
       resultSetObservable = Observable.combineLatest(firstCall, secCall, thirdCall) 
    firstCall = connection.callWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")),jsonArray) 

     secCall = result -> connection.callWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")), jsonArray) 

    thirdCall = result -> connection.callWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")),jsonArray) 
       resultSetObservable.subscribe(firstRes, secRes, thirdRes -> { 
        handler.handle(ReportUtils.parseSQLResult(resultSet.getRows())); 
       },error -> { 
        error.printStackTrace(); 
       },connection::close); 

      },err -> { 
       err.printStackTrace(); 
      } 
    ); 
0

@Bharath Mg。我修改了僞代碼,它正在爲我工​​作。

String currentPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName")); 
String priorPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName")); 
String todayPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName")); 


database.dbObject().getConnectionObservable().subscribe(
     connection -> { 

      Observable<ResultSet> firstCall = connection.queryWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams"))); 
      Observable<ResultSet> secondCall = connection.queryWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams"))); 
      Observable<ResultSet> thirdCall = connection.queryWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams"))); 

      Observable.zip(firstCall, secondCall, thirdCall, new Func3<ResultSet, ResultSet, ResultSet, List<JsonObject>>() { 
       @Override 
       public List<JsonObject> call(ResultSet resultSet, ResultSet resultSet2, ResultSet resultSet3) { 
        List<JsonObject> allRecord = new ArrayList<JsonObject>(); 
        allRecord.addAll(resultSet.getRows()); 
        allRecord.addAll(resultSet2.getRows()); 
        allRecord.addAll(resultSet3.getRows()); 
        return allRecord; 
       } 
      }).subscribe(resultSet -> { 
       handler.handle(resultSet); 
      },error -> { 
       error.printStackTrace(); 
      },connection::close); 

     },err -> { 
      err.printStackTrace(); 
     } 
); 
+0

很酷。通過計算器規則,如果你從我的解決方案中得到答案,你應該接受我的答案:) –