我需要爲django queryset的每個條目生成PDF報告。將有介於3萬到4萬之間的條目。python - 異步處理HTTP請求
PDF是通過外部API生成的。由於當前是按需生成的,因此通過HTTP請求/響應同步處理。 這對於這個任務會有所不同,因爲我認爲我將使用django管理命令循環查詢集並執行PDF生成。
這個任務應該遵循哪種方法?我想到了兩種可能的解決方案,雖然是我從未使用過的技術:
1)Celery:向工作人員分配任務(具有不同有效負載的http請求),然後在完成後檢索它。
2)request-futures:以非阻塞方式使用請求。
目標是同時使用API(例如,根據API可以處理多少個併發請求,同時發送10或100個http請求)。
任何人在這裏處理類似的任務,並可以提供建議,如何繼續這個? (:大部分代碼的重複使用,而不是我自己寫的,因爲我把這個項目的所有權注。):
是第一次嘗試,與multiprocessing提出以下
class Checker(object):
def __init__(self, *args, **kwargs):
# ... various setup
# other methods
# .....
def run_single(self, uuid, verbose=False):
"""
run a single PDF generation and local download
"""
start = timer()
headers = self.headers
data, obj = self.get_review_data(uuid)
if verbose:
print("** Report: {} **".format(obj))
response = requests.post(
url=self.endpoint_url,
headers=headers,
data=json.dumps(data)
)
if verbose:
print('POST - Response: {} \n {} \n {} secs'.format(
response.status_code,
response.content,
response.elapsed.total_seconds())
)
run_url = self.check_progress(post_response=response, verbose=True)
if run_url:
self.get_file(run_url, obj, verbose=True)
print("*** Download {}in {} secs".format("(verbose) " if verbose else "", timer()-start))
def run_all(self, uuids, verbose=True):
start = timer()
for obj_uuid in review_uuids:
self.run_single(obj_uuid, verbose=verbose)
print("\n\n### Downloaded {}{} reviews in {} secs".format(
"(verbose) " if verbose else "",
len(uuids),
timer() - start)
)
def run_all_multi(self, uuids, workers=4, verbose=True):
pool = Pool(processes=workers)
pool.map(self.run_single, uuids)
def check_progress(self, post_response, attempts_limit=10000, verbose=False):
"""
check the progress of PDF generation querying periodically the API endpoint
"""
if post_response.status_code != 200:
if verbose: print("POST response status code != 200 - exit")
return None
url = 'https://apidomain.com/{path}'.format(
domain=self.domain,
path=post_response.json().get('links', {}).get('self', {}).get('href'),
headers = self.headers
)
job_id = post_response.json().get('jobId', '')
status = 'Running'
attempt_counter = 0
start = timer()
if verbose:
print("GET - url: {}".format(url))
while status == 'Running':
attempt_counter += 1
job_response = requests.get(
url=url,
headers=self.headers,
)
job_data = job_response.json()
status = job_data['status']
message = job_data['message']
progress = job_data['progress']
if status == 'Error':
if verbose:
end = timer()
print(
'{sc} - job_id: {job_id} - error_id: [{error_id}]: {message}'.format(
sc=job_response.status_code,
job_id=job_id,
error_id=job_data['errorId'],
message=message
), '{} secs'.format(end - start)
)
print('Attempts: {} \n {}% progress'.format(attempt_counter, progress))
return None
if status == 'Complete':
if verbose:
end = timer()
print('run_id: {run_id} - Complete - {secs} secs'.format(
run_id=run_id,
secs=end - start)
)
print('Attempts: {}'.format(attempt_counter))
print('{url}/files/'.format(url=url))
return '{url}/files/'.format(url=url)
if attempt_counter >= attempts_limit:
if verbose:
end = timer()
print('File failed to generate after {att_limit} retrieve attempts: ({progress}% progress)' \
' - {message}'.format(
att_limit=attempts_limit,
progress=int(progress * 100),
message=message
), '{} secs'.format(end-start))
return None
if verbose:
print('{}% progress - attempts: {}'.format(progress, attempt_counter), end='\r')
sys.stdout.flush()
time.sleep(1)
if verbose:
end = timer()
print(status, 'message: {} - attempts: {} - {} secs'.format(message, attempt_counter, end - start))
return None
def get_review_data(self, uuid, host=None, protocol=None):
review = get_object_or_404(MyModel, uuid)
internal_api_headers = {
'Authorization': 'Token {}'.format(
review.employee.csod_profile.csod_user_token
)
}
data = requests.get(
url=a_local_url,
params={'format': 'json', 'indirect': 'true'},
headers=internal_api_headers,
).json()
return (data, review)
def get_file(self, runs_url, obj, verbose=False):
runs_files_response = requests.get(
url=runs_url,
headers=self.headers,
stream=True,
)
runs_files_data = runs_files_response.json()
file_path = runs_files_data['files'][0]['links']['file']['href'] # remote generated file URI
file_response_url = 'https://apidomain.com/{path}'.format(path=file_path)
file_response = requests.get(
url=file_response_url,
headers=self.headers,
params={'userId': settings.CREDENTIALS['userId']},
stream=True,
)
if file_response.status_code != 200:
if verbose:
print('error in retrieving file for {r_id}\nurl: {url}\n'.format(
r_id=obj.uuid, url=file_response_url)
)
local_file_path = '{temp_dir}/{uuid}-{filename}-{employee}.pdf'.format(
temp_dir=self.local_temp_dir,
uuid=obj.uuid,
employee=slugify(obj.employee.get_full_name()),
filename=slugify(obj.task.name)
)
with open(local_file_path, 'wb') as f:
for block in file_response.iter_content(1024):
f.write(block)
if verbose:
print('\n --> {r} [{uuid}]'.format(r=review, uuid=obj.uuid))
print('\n --> File downloaded: {path}'.format(path=local_file_path))
@classmethod
def get_temp_directory(self):
"""
generate a local unique temporary directory
"""
return '{temp_dir}/'.format(
temp_dir=mkdtemp(dir=TEMP_DIR_PREFIX),
)
if __name__ == "__main__":
uuids = #list or generator of objs uuids
checker = Checker()
checker.run_all_multi(uuids=uuids)
不幸的是,跑checker.run_all_multi
有以下效果
- python shell freeze;
- 不打印輸出;
- 沒有生成文件;
- 我不得不殺了命令行控制檯,正常的鍵盤中斷停止工作
,同時運行checker.run_all
不正常工作(一一)。
任何有關爲什麼此代碼不起作用的建議(而不是關於我可以用來代替多處理的)?
謝謝大家。
您需要多久生成一次這些報告?這一代是手動還是自動觸發的? –
- 每年一次 - 手動 – Luke
在那個頻率我傾向於使用請求期貨和避免需要設置rabbitmq等 – Anentropic