LRU裝飾,其緩存尚未完成協程,擁有並行請求相同的密鑰非常有用的另一種變體:
import asyncio
from collections import OrderedDict
from functools import _make_key, wraps
def async_cache(maxsize=128, event_loop=None):
cache = OrderedDict()
if event_loop is None:
event_loop = asyncio.get_event_loop()
awaiting = dict()
async def run_and_cache(func, args, kwargs):
"""await func with the specified arguments and store the result
in cache."""
result = await func(*args, **kwargs)
key = _make_key(args, kwargs, False)
cache[key] = result
if len(cache) > maxsize:
cache.popitem(False)
cache.move_to_end(key)
return result
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
key = _make_key(args, kwargs, False)
if key in cache:
return cache[key]
if key in awaiting:
task = awaiting[key]
return await asyncio.wait_for(task, timeout=None, loop=event_loop)
task = asyncio.ensure_future(run_and_cache(func, args, kwargs), loop=event_loop)
awaiting[key] = task
result = await asyncio.wait_for(task, timeout=None, loop=event_loop)
del awaiting[key]
return result
return wrapper
return decorator
async def test_async_cache(event_loop):
counter = 0
n, m = 10, 3
@async_cache(maxsize=n, event_loop=event_loop)
async def cached_function(x):
nonlocal counter
await asyncio.sleep(0) # making event loop switch to other coroutine
counter += 1
return x
tasks = [asyncio.ensure_future(cached_function(x), loop=event_loop)
for x in list(range(n)) * m]
done, pending = await asyncio.wait(tasks, loop=event_loop, timeout=1)
assert len(done) == n * m
assert counter == n
event_loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test_async_cache(event_loop))
event_loop.run_until_complete(task)
來源
2016-09-22 01:16:44
vvd
不知道你的緩存協同程序是什麼意思?例如將它保存爲一個變量,以便您可以重複調用它?保存結果,直到結果在稍後執行時被替換?或者稍後有相同的協程重複? – shongololo
@shongololo我想緩存協程的結果。 – tobib
我對functools.lru_cache()並不熟悉,但是如果您只是想返回更新後的結果,那麼是否有任何理由將更新後的結果保存到變量中?儘管如此,當使用異步方法時(比如'aiohttp.get()'),你必須用某種方法驅動它。所以cached_request必須用'@ asyncio.coroutine'封裝;它必須用''from'來調用;並且return語句應該沿'return(yield from aiohttp.get(url))'行構建' – shongololo