2014-01-11 37 views
1

我試圖運行以下代碼,但狀態變量始終爲「PENDING」。你能告訴我我做錯了什麼嗎?BigQuery作業狀態始終爲「PENDING」

Job execute = bigquery.jobs().insert(PROJECT_ID, runJob).execute(); 

String status; 
while(status.equalsIgnoreCase("PENDING")) { 
    status = execute.getStatus().getState(); 
    System.out.println("Status: " + status); 
    Thread.wait(1000); 
} 

回答

3

您的代碼沒有向BigQuery發出請求以獲取更新狀態,它只是檢查插入調用返回的作業狀態。

相反,你應該通過發出jobs.get請求輪詢作業的狀態,並檢查狀態,例如:

Job job = bigquery.jobs().insert(PROJECT_ID, runJob).execute(); 
String status = job.getStatus().getState(); 
while(!status.equalsIgnoreCase("DONE")) { 
    status = bigquery.jobs().get(PROJECT_ID, job.getId()).execute().getStatus().getState(); 
    System.out.println("Status: " + status); 
    Thread.wait(1000); 
} 

*基於喬丹Tigani的評論編輯。

+3

完整的工作集你可能要考慮一個變化如下代碼:等待工作是「DONE 「而不是等待工作不再是」待辦事項「,因爲工作也將經歷」跑步「狀態。 –

0

我已經意識到檢查,直到狀態不是「完成」可能不會在任何時候產生錯誤。有時候,作業處於「完成」狀態後,可能會發現錯誤。即Job在某些錯誤中從「掛起」變爲「完成」,從而跳過「跑步」階段。因此,即使作業爲「完成」之後,檢查作業['狀態']中的錯誤字段也可能會很好。

0

而不是有一個繁忙的等待循環同步阻塞運行插入的線程,我已經去了一個預定的線程,維護作業ID的隊列。它遍歷作業並檢查其狀態,發現時記錄錯誤。

關鍵位這裏,

  1. 附表一個線程來監視作業

    jobPollScheduler.scheduleAtFixedRate(new JobPoll(), SCHEDULE_SECONDS, SCHEDULE_SECONDS, TimeUnit.SECONDS); 
通過作業隊列
  • 循環並檢查它們的進展。重新排隊任何未完成
  • 
        while ((job = jobs.poll()) != null) { 
         final Job statusJob = bigQuery.jobs().get(projectId, job.jobId).execute(); 
         if ("DONE".equals(statusJob.getStatus().getState())) { 
         final ErrorProto errorResult = statusJob.getStatus().getErrorResult(); 
         if (errorResult == null || errorResult.toString() == null) { 
          logger.debug("status={}, job={}", statusJob.getStatus().getState(), job); 
         } else { 
          logger.error("status={}, errorResult={}, job={}", statusJob.getStatus().getState(), errorResult, job); 
         } 
         } else { 
         // job isn't done, yet. Add it back to queue. 
         add(job.jobId); 
         logger.debug("will check again, status={}, job={}", statusJob.getStatus().getState(), job); 
         } 
        } 
    

    
    import com.google.api.services.bigquery.Bigquery; 
    import com.google.api.services.bigquery.model.ErrorProto; 
    import com.google.api.services.bigquery.model.Job; 
    import com.google.common.primitives.Longs; 
    import com.google.common.util.concurrent.ThreadFactoryBuilder; 
    import org.slf4j.Logger; 
    import org.slf4j.LoggerFactory; 
    
    import java.util.Objects; 
    import java.util.Queue; 
    import java.util.concurrent.DelayQueue; 
    import java.util.concurrent.Delayed; 
    import java.util.concurrent.Executors; 
    import java.util.concurrent.ScheduledExecutorService; 
    import java.util.concurrent.TimeUnit; 
    import java.util.function.Supplier; 
    import javax.annotation.Nonnull; 
    
    /** 
    * Monitor BigQuery inserts 
    */ 
    public class BigQueryMonitorSo21064586 { 
    
        private static final Logger logger = LoggerFactory.getLogger(BigQueryMonitorSo21064586.class); 
        private static final int SCHEDULE_SECONDS = 5; 
    
        private final ScheduledExecutorService jobPollScheduler = 
         Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("big-query-monitory-%d").build()); 
        private final Queue jobs = new DelayQueue(); 
        private final Supplier connectionSupplier; 
        private final String projectId; 
    
        /** 
        * @param connectionSupplier gives us a connection to BigQuery 
        * @param projectId Google cloud project 
        */ 
        public BigQueryMonitorSo21064586(@Nonnull final Supplier connectionSupplier, @Nonnull final String projectId) { 
        this.connectionSupplier = connectionSupplier; 
        this.projectId = projectId; 
        } 
    
        public BigQueryMonitorSo21064586 start() { 
        jobPollScheduler.scheduleAtFixedRate(new JobPoll(), SCHEDULE_SECONDS, SCHEDULE_SECONDS, TimeUnit.SECONDS); 
        return this; 
        } 
    
        /** 
        * @param jobId insert query job id 
        */ 
        public void add(final String jobId) { 
        final DelayedJobCheck job = new DelayedJobCheck(jobId); 
        try { 
         if (!jobs.offer(job)) { 
         logger.error("could not enqueue BigQuery job, job={}", job); 
         } 
        } catch (final Exception e) { 
         logger.error("failed to add job to queue, job={}", job, e); 
        } 
        } 
    
        public void shutdown() { 
        jobPollScheduler.shutdown(); 
        } 
    
        private class JobPoll implements Runnable { 
    
        /** 
        * go through the queue and remove anything that is done 
        */ 
        @Override 
        public void run() { 
         try { 
         final Bigquery bigQuery = connectionSupplier.get(); 
         DelayedJobCheck job; 
         while ((job = jobs.poll()) != null) { 
          final Job statusJob = bigQuery.jobs().get(projectId, job.jobId).execute(); 
          if ("DONE".equals(statusJob.getStatus().getState())) { 
          final ErrorProto errorResult = statusJob.getStatus().getErrorResult(); 
          if (errorResult == null || errorResult.toString() == null) { 
           logger.debug("status={}, job={}", statusJob.getStatus().getState(), job); 
          } else { 
           logger.error("status={}, errorResult={}, job={}", statusJob.getStatus().getState(), errorResult, job); 
          } 
          } else { 
          // job isn't done, yet. Add it back to queue. 
          add(job.jobId); 
          logger.debug("will check again, status={}, job={}", statusJob.getStatus().getState(), job); 
          } 
         } 
         } catch (final Exception e) { 
         logger.error("exception monitoring big query status, size={}", jobs.size(), e); 
         } 
        } 
        } 
    
        private static class DelayedJobCheck extends DelayedImpl { 
    
        private final String jobId; 
    
        DelayedJobCheck(final String jobId) { 
         super(SCHEDULE_SECONDS, TimeUnit.SECONDS); 
         this.jobId = jobId; 
        } 
    
        @Override 
        public boolean equals(final Object obj) { 
         if (this == obj) { 
         return true; 
         } 
         if (obj == null || getClass() != obj.getClass()) { 
         return false; 
         } 
         if (!super.equals(obj)) { 
         return false; 
         } 
         final DelayedJobCheck other = (DelayedJobCheck) obj; 
         return Objects.equals(jobId, other.jobId); 
        } 
    
        @Override 
        public int hashCode() { 
         return Objects.hash(super.hashCode(), jobId); 
        } 
        } 
    
        private static class DelayedImpl implements Delayed { 
    
        /** 
        * timestamp when delay expires 
        */ 
        private final long expiry; 
    
        /** 
        * @param amount how long the delay should be 
        * @param timeUnit units of the delay 
        */ 
        DelayedImpl(final long amount, final TimeUnit timeUnit) { 
         final long more = TimeUnit.MILLISECONDS.convert(amount, timeUnit); 
         expiry = System.currentTimeMillis() + more; 
        } 
    
        @Override 
        public long getDelay(@Nonnull final TimeUnit unit) { 
         final long diff = expiry - System.currentTimeMillis(); 
         return unit.convert(diff, TimeUnit.MILLISECONDS); 
        } 
    
        @Override 
        public int compareTo(@Nonnull final Delayed o) { 
         return Longs.compare(expiry, ((DelayedImpl) o).expiry); 
        } 
    
        @Override 
        public boolean equals(final Object obj) { 
         if (this == obj) { 
         return true; 
         } 
         if (!(obj instanceof DelayedImpl)) { 
         return false; 
         } 
         final DelayedImpl delayed = (DelayedImpl) obj; 
         return expiry == delayed.expiry; 
        } 
    
        @Override 
        public int hashCode() { 
         return Objects.hash(expiry); 
        } 
        } 
    }