我們正面臨一個間歇性問題,即當我們通過BigQuery Java API執行查詢時,那麼當我們執行相同的行數時,我們得到的行數不匹配通過BigQuery UI進行查詢。當我們通過它執行查詢時BigQuery Java API不返回所有行
在我們的代碼中,我們使用QueryResponse對象執行查詢,我們也檢查查詢是否完成或不通過檢查標誌 GetQueryResultsResponse.getJobComplete(),我們也有機制來拉動更多的記錄,如果該查詢不返回所有行一短while(queryResult.getRows() != null && queryResult.getTotalRows().compareTo(BigInteger.valueOf((queryResult.getRows().size()))) > 0) {
以下是我們用來執行查詢的代碼段:
int retryCount = 0;
long waitTime = Constant.BASE_WAIT_TIME;
Bigquery bigquery = cloudPlatformConnector.connectBQ();
QueryRequest queryRequest = new QueryRequest();
queryRequest.setUseLegacySql(useLegacyDialect);
GetQueryResultsResponse queryResult = null;
GetQueryResultsResponse queryPaginationResult = null;
String pageToken;
do{
try{
QueryResponse query = bigquery.jobs().query(this.projectId, queryRequest.setQuery(querySql)).execute();
queryResult = bigquery.jobs().getQueryResults(query.getJobReference().getProjectId(), query.getJobReference().getJobId()).execute();
if(queryResult != null){
if(!queryResult.getJobComplete()){
LOGGER.info("JobId for the query : "+ query.getJobReference().getJobId() + " is Job Completed : "+ queryResult.getJobComplete());
if(queryResult.getErrors() != null){
for(ErrorProto err: queryResult.getErrors()){
LOGGER.info("Errors in query, Reason : "+ err.getReason()+ " Location : "+ err.getLocation() +" Message : "+ err.getMessage());
}
}
LOGGER.info("Query not completed : "+querySql);
throw new IOException("Query is failing retrying it");
}
}
LOGGER.info("JobId for the query : "+ query.getJobReference().getJobId() + " is Job Completed : "+ queryResult.getJobComplete() + " Total rows from query : " + queryResult.getTotalRows());
pageToken = queryResult.getPageToken();
while(queryResult.getRows() != null && queryResult.getTotalRows().compareTo(BigInteger.valueOf((queryResult.getRows().size()))) > 0) {
LOGGER.info("Inside the Pagination code block, Page Token : "+pageToken);
queryPaginationResult = bigquery.jobs().getQueryResults(projectId,query.getJobReference().getJobId()).setPageToken(pageToken).setStartIndex(BigInteger.valueOf(queryResult.getRows().size())).execute();
queryResult.getRows().addAll(queryPaginationResult.getRows());
pageToken = queryPaginationResult.getPageToken();
LOGGER.info("Inside the Pagination code block, total size : "+ queryResult.getTotalRows() + " Current Size : "+ queryResult.getRows().size());
}
}catch(IOException ex){
retryCount ++;
LOGGER.info("BQ Connection Attempt "+retryCount +" failed, Retrying in " + waitTime + " seconds");
if (retryCount == Constant.MAX_RETRY_LIMIT) {
LOGGER.info("BQ Connection Error", ex);
throw ex;
}
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
LOGGER.info("Thread Error");
}
waitTime *= 2;
}
}while((queryResult == null && retryCount < Constant.MAX_RETRY_LIMIT) || (!queryResult.getJobComplete() && retryCount < Constant.MAX_RETRY_LIMIT));
return queryResult.getRows();
查詢中,我沒有得到所有的行沒有任何限制條款在裏面。
目前我們使用的是google-cloud-bigquery的0.5.0版本。
在此先感謝!
謝謝,阮,我試過這個東西,但沒有成功,我面臨着同樣的問題,它看起來像我的流程永遠不會進入分塊的情況下拉扯更多記錄的塊。我也更新了我的代碼,考慮pageToken。 – Amandeep
我看到你將'.setPageToken(queryResult.getPageToken())'添加到了while循環中。但是不會'queryResult.getPageToken()'總是第一頁的頁面標記?我認爲你需要獲得每個getQueryResults調用的查詢標記,並將其放入下一個調用中。 (1)你所期望的行數是多少,(2)你的代碼獲取的行數是多少,(3)什麼是信息日誌輸出它。 –
我得到了你所說的,我可以更新我的代碼,但我沒有看到任何我在while循環中輸入的日誌語句,這迫使我認爲這個問題與分頁無關。 (1)如果我們談論10月14日,我期望的總行數會有所不同,但我期望3978,但只有3972,差異並不總是如此接近,有時我也看到超過500行的差異 – Amandeep