3
我想寫一個上下文管理器來處理GitHub
速率限制異常。本質上,我希望它能夠監聽錯誤,並在發生錯誤時動態調整復位時間(全部通過API執行),然後等待這段時間。在這一點上,我希望它能夠恢復該計劃,並根據需要多次完成這項工作。如何使用Context Manager處理API速率限制?
這是我到目前爲止有:
@contextlib.contextmanager
def api_rate_manager(api_obj: g3.GitHub):
# Check for the API ratelimit being exhausted. Limited to 5k
# requests per hour.
try:
yield
except GitHubError as e:
if 'rate limit exceeded' in e.msg.lower():
info = g3.rate_limit()['resources']['core']
reset = mu.convert_unix_timestamp(info.get('reset'))
delta = reset - datetime.now()
sleep(
delta.seconds + 1) # Add a second to account for milliseconds
目前,它會正確地捕捉錯誤和等待,但隨後它只是退出程序(這是有道理的),而不是盤旋左右回來繼續。我知道我可以在代碼中檢查一下,看看剩下的限制是什麼,等待它達到0,但我想練習上下文管理器。
它將以下面的方式被使用:
with api_rate_manager(gh):
for commit_iter in commit_iters:
handler: gu.EtagHandler = commit_iter.etag_handler
for commit in commit_iter:
if not commit:
continue
commit.refresh()
author_data: dict = commit.commit.author
data = {
'sha': commit.sha,
'author': author_data.get('name'),
'author_email': author_data.get('email'),
'create_date': author_data.get('date'),
'additions': commit.additions,
'deletions': commit.deletions,
'total': commit.total
}
mu.add_etl_fields(data)
writer.writerow(data)
has_data = True
etag: str = commit_iter.get_etag()
if etag:
logger.info(f'Etag for {commit_iter.name}: {etag}')
handler.store_in_db(etag=etag)