-2

我需要爲django queryset的每個條目生成PDF報告。將有介於3萬到4萬之間的條目。python - 異步處理HTTP請求

PDF是通過外部API生成的。由於當前是按需生成的,因此通過HTTP請求/響應同步處理。 這對於這個任務會有所不同,因爲我認爲我將使用django管理命令循環查詢集並執行PDF生成。

這個任務應該遵循哪種方法?我想到了兩種可能的解決方案,雖然是我從未使用過的技術:

1)Celery:向工作人員分配任務(具有不同有效負載的http請求),然後在完成後檢索它。

2)request-futures:以非阻塞方式使用請求。

目標是同時使用API​​(例如,根據API可以處理多少個併發請求,同時發送10或100個http請求)。

任何人在這裏處理類似的任務,並可以提供建議,如何繼續這個? (:大部分代碼的重複使用,而不是我自己寫的,因爲我把這個項目的所有權注。):

是第一次嘗試,與multiprocessing提出以下

class Checker(object): 

    def __init__(self, *args, **kwargs): 
     # ... various setup 

    # other methods 
    # ..... 

    def run_single(self, uuid, verbose=False): 
     """ 
     run a single PDF generation and local download 
     """ 
     start = timer() 
     headers = self.headers 

     data, obj = self.get_review_data(uuid) 
     if verbose: 
      print("** Report: {} **".format(obj)) 
     response = requests.post(
      url=self.endpoint_url, 
      headers=headers, 
      data=json.dumps(data) 
     ) 
     if verbose: 
      print('POST - Response: {} \n {} \n {} secs'.format(
       response.status_code, 
       response.content, 
       response.elapsed.total_seconds()) 
      ) 
     run_url = self.check_progress(post_response=response, verbose=True) 
     if run_url: 
      self.get_file(run_url, obj, verbose=True) 
     print("*** Download {}in {} secs".format("(verbose) " if verbose else "", timer()-start)) 


    def run_all(self, uuids, verbose=True): 
     start = timer() 
     for obj_uuid in review_uuids: 
      self.run_single(obj_uuid, verbose=verbose) 
     print("\n\n### Downloaded {}{} reviews in {} secs".format(
      "(verbose) " if verbose else "", 
      len(uuids), 
      timer() - start) 
     ) 

    def run_all_multi(self, uuids, workers=4, verbose=True): 
     pool = Pool(processes=workers) 
     pool.map(self.run_single, uuids) 


    def check_progress(self, post_response, attempts_limit=10000, verbose=False): 
     """ 
     check the progress of PDF generation querying periodically the API endpoint 
     """ 
     if post_response.status_code != 200: 
      if verbose: print("POST response status code != 200 - exit") 
      return None 
     url = 'https://apidomain.com/{path}'.format(
      domain=self.domain, 
      path=post_response.json().get('links', {}).get('self', {}).get('href'), 
      headers = self.headers 
     ) 
     job_id = post_response.json().get('jobId', '') 
     status = 'Running' 
     attempt_counter = 0 
     start = timer() 
     if verbose: 
      print("GET - url: {}".format(url)) 
     while status == 'Running': 
      attempt_counter += 1 
      job_response = requests.get(
       url=url, 
       headers=self.headers, 
      ) 
      job_data = job_response.json() 
      status = job_data['status'] 
      message = job_data['message'] 
      progress = job_data['progress'] 
      if status == 'Error': 
       if verbose: 
        end = timer() 
        print(
         '{sc} - job_id: {job_id} - error_id: [{error_id}]: {message}'.format(
          sc=job_response.status_code, 
          job_id=job_id, 
          error_id=job_data['errorId'], 
          message=message 
         ), '{} secs'.format(end - start) 
        ) 
        print('Attempts: {} \n {}% progress'.format(attempt_counter, progress)) 
       return None 
      if status == 'Complete': 
       if verbose: 
        end = timer() 
        print('run_id: {run_id} - Complete - {secs} secs'.format(
         run_id=run_id, 
         secs=end - start) 
        ) 
        print('Attempts: {}'.format(attempt_counter)) 
        print('{url}/files/'.format(url=url)) 
       return '{url}/files/'.format(url=url) 
      if attempt_counter >= attempts_limit: 
       if verbose: 
        end = timer() 
        print('File failed to generate after {att_limit} retrieve attempts: ({progress}% progress)' \ 
          ' - {message}'.format(
           att_limit=attempts_limit, 
           progress=int(progress * 100), 
           message=message 
         ), '{} secs'.format(end-start)) 
       return None 
      if verbose: 
       print('{}% progress - attempts: {}'.format(progress, attempt_counter), end='\r') 
       sys.stdout.flush() 
      time.sleep(1) 
     if verbose: 
      end = timer() 
      print(status, 'message: {} - attempts: {} - {} secs'.format(message, attempt_counter, end - start)) 
     return None 

    def get_review_data(self, uuid, host=None, protocol=None): 
     review = get_object_or_404(MyModel, uuid) 
     internal_api_headers = { 
      'Authorization': 'Token {}'.format(
       review.employee.csod_profile.csod_user_token 
      ) 
     } 

     data = requests.get(
      url=a_local_url, 
      params={'format': 'json', 'indirect': 'true'}, 
      headers=internal_api_headers, 
     ).json() 
     return (data, review) 

    def get_file(self, runs_url, obj, verbose=False): 

     runs_files_response = requests.get(
      url=runs_url, 
      headers=self.headers, 
      stream=True, 
     ) 

     runs_files_data = runs_files_response.json() 


     file_path = runs_files_data['files'][0]['links']['file']['href'] # remote generated file URI 
     file_response_url = 'https://apidomain.com/{path}'.format(path=file_path) 
     file_response = requests.get(
      url=file_response_url, 
      headers=self.headers, 
      params={'userId': settings.CREDENTIALS['userId']}, 
      stream=True, 
     ) 
     if file_response.status_code != 200: 
      if verbose: 
       print('error in retrieving file for {r_id}\nurl: {url}\n'.format(
        r_id=obj.uuid, url=file_response_url) 
       ) 
     local_file_path = '{temp_dir}/{uuid}-{filename}-{employee}.pdf'.format(
      temp_dir=self.local_temp_dir, 
      uuid=obj.uuid, 
      employee=slugify(obj.employee.get_full_name()), 
      filename=slugify(obj.task.name) 
     ) 
     with open(local_file_path, 'wb') as f: 
      for block in file_response.iter_content(1024): 
       f.write(block) 
      if verbose: 
       print('\n --> {r} [{uuid}]'.format(r=review, uuid=obj.uuid)) 
       print('\n --> File downloaded: {path}'.format(path=local_file_path)) 

    @classmethod 
    def get_temp_directory(self): 
     """ 
     generate a local unique temporary directory 
     """ 
     return '{temp_dir}/'.format(
      temp_dir=mkdtemp(dir=TEMP_DIR_PREFIX), 
     ) 

if __name__ == "__main__": 
    uuids = #list or generator of objs uuids 
    checker = Checker() 
    checker.run_all_multi(uuids=uuids) 

不幸的是,跑checker.run_all_multi有以下效果

  • python shell freeze;
  • 不打印輸出;
  • 沒有生成文件;
  • 我不得不殺了命令行控制檯,正常的鍵盤中斷停止工作

,同時運行checker.run_all不正常工作(一一)。

任何有關爲什麼此代碼不起作用的建議(而不是關於我可以用來代替多處理的)?

謝謝大家。

+0

您需要多久生成一次這些報告?這一代是手動還是自動觸發的? –

+0

- 每年一次 - 手動 – Luke

+0

在那個頻率我傾向於使用請求期貨和避免需要設置rabbitmq等 – Anentropic

回答

1

隨着你的頻率,每年手動一次&。你不需要芹菜或請求期貨。

創建像

def record_to_pdf(record): 
    # create pdf from record 

的方法然後創建與代碼的管理命令(使用multiprocessing.Pool

from multiprocessing import Pool 
pool = Pool(processes=NUMBER_OF_CORES) 
pool.map(record_to_pdf, YOUR_QUERYSET) 

管理命令將不會被異步雖然。要使其異步,可以在後臺運行它。另外,如果你的進程沒有CPU綁定(比如,它只是調用一些API),那麼@Anentropic建議你可以在創建池時嘗試更多的進程。

+0

對於不是cpu-bound的任務,你也可以試驗一些prosees> NUMBER_OF_CORES – Anentropic

+0

@Antropic你說得對,它的方法'record_to_pdf'只調用一些API,然後可以通過調用一些API來增加進程一個重要的數字(受網絡速度和API速率限制)。 –

+0

試過。它不會向stdout輸出任何東西,它不會將任何文件保存到目標目錄,並凍結shell(我需要使用kill -9來殺死它)。 相同的代碼沒有多處理,按順序處理每個項目。 我可以粘貼代碼。有任何想法嗎? – Luke