2017-10-13 65 views
0

我們正面臨一個間歇性問題,即當我們通過BigQuery Java API執行查詢時,那麼當我們執行相同的行數時,我們得到的行數不匹配通過BigQuery UI進行查詢。當我們通過它執行查詢時BigQuery Java API不返回所有行

在我們的代碼中,我們使用QueryResponse對象執行查詢,我們也檢查查詢是否完成或不通過檢查標誌 GetQueryResultsResponse.getJobComplete(),我們也有機制來拉動更多的記錄,如果該查詢不返回所有行一短while(queryResult.getRows() != null && queryResult.getTotalRows().compareTo(BigInteger.valueOf((queryResult.getRows().size()))) > 0) {

以下是我們用來執行查詢的代碼段:

int retryCount = 0; 
    long waitTime = Constant.BASE_WAIT_TIME; 
    Bigquery bigquery = cloudPlatformConnector.connectBQ(); 
    QueryRequest queryRequest = new QueryRequest(); 
    queryRequest.setUseLegacySql(useLegacyDialect); 
    GetQueryResultsResponse queryResult = null; 
    GetQueryResultsResponse queryPaginationResult = null; 
    String pageToken; 
    do{ 
     try{ 
       QueryResponse query = bigquery.jobs().query(this.projectId, queryRequest.setQuery(querySql)).execute(); 
       queryResult = bigquery.jobs().getQueryResults(query.getJobReference().getProjectId(), query.getJobReference().getJobId()).execute();     
       if(queryResult != null){ 
        if(!queryResult.getJobComplete()){ 
         LOGGER.info("JobId for the query : "+ query.getJobReference().getJobId() + " is Job Completed : "+ queryResult.getJobComplete()); 
         if(queryResult.getErrors() != null){ 
          for(ErrorProto err: queryResult.getErrors()){ 
           LOGGER.info("Errors in query, Reason : "+ err.getReason()+ " Location : "+ err.getLocation() +" Message : "+ err.getMessage()); 
          } 
         } 
         LOGGER.info("Query not completed : "+querySql); 
         throw new IOException("Query is failing retrying it"); 
        } 
       } 
       LOGGER.info("JobId for the query : "+ query.getJobReference().getJobId() + " is Job Completed : "+ queryResult.getJobComplete() + " Total rows from query : " + queryResult.getTotalRows()); 
       pageToken = queryResult.getPageToken(); 
       while(queryResult.getRows() != null && queryResult.getTotalRows().compareTo(BigInteger.valueOf((queryResult.getRows().size()))) > 0) { 
        LOGGER.info("Inside the Pagination code block, Page Token : "+pageToken); 
        queryPaginationResult = bigquery.jobs().getQueryResults(projectId,query.getJobReference().getJobId()).setPageToken(pageToken).setStartIndex(BigInteger.valueOf(queryResult.getRows().size())).execute(); 
        queryResult.getRows().addAll(queryPaginationResult.getRows()); 
        pageToken = queryPaginationResult.getPageToken(); 
        LOGGER.info("Inside the Pagination code block, total size : "+ queryResult.getTotalRows() + " Current Size : "+ queryResult.getRows().size()); 
       } 

     }catch(IOException ex){ 
       retryCount ++; 
       LOGGER.info("BQ Connection Attempt "+retryCount +" failed, Retrying in " + waitTime + " seconds"); 
       if (retryCount == Constant.MAX_RETRY_LIMIT) { 
        LOGGER.info("BQ Connection Error", ex); 
        throw ex; 
       } 
       try { 
        Thread.sleep(waitTime); 
       } catch (InterruptedException e) { 
        LOGGER.info("Thread Error"); 
       } 
       waitTime *= 2; 
     } 
    }while((queryResult == null && retryCount < Constant.MAX_RETRY_LIMIT) || (!queryResult.getJobComplete() && retryCount < Constant.MAX_RETRY_LIMIT)); 
    return queryResult.getRows(); 

查詢中,我沒有得到所有的行沒有任何限制條款在裏面。

目前我們使用的是google-cloud-bigquery的0.5.0版本。

在此先感謝!

回答

1

我認爲在後續調用getQueryResults時,您需要正確撥打setPageToken,並使用前一頁返回的pageToken。否則getQueryResults只會從第一頁返回行。

+0

謝謝,阮,我試過這個東西,但沒有成功,我面臨着同樣的問題,它看起來像我的流程永遠不會進入分塊的情況下拉扯更多記錄的塊。我也更新了我的代碼,考慮pageToken。 – Amandeep

+0

我看到你將'.setPageToken(queryResult.getPageToken())'添加到了while循環中。但是不會'queryResult.getPageToken()'總是第一頁的頁面標記?我認爲你需要獲得每個getQueryResults調用的查詢標記,並將其放入下一個調用中。 (1)你所期望的行數是多少,(2)你的代碼獲取的行數是多少,(3)什麼是信息日誌輸出它。 –

+0

我得到了你所說的,我可以更新我的代碼,但我沒有看到任何我在while循環中輸入的日誌語句,這迫使我認爲這個問題與分頁無關。 (1)如果我們談論10月14日,我期望的總行數會有所不同,但我期望3978,但只有3972,差異並不總是如此接近,有時我也看到超過500行的差異 – Amandeep