2015-12-06 61 views
7

我使用aiohttp使在Python 3.4這樣一個簡單的HTTP請求:如何緩存ASYNCIO協同程序

response = yield from aiohttp.get(url) 

應用程序一遍又一遍地請求相同的URL再次所以很自然,我想緩存它。我第一次嘗試這樣的:

@functools.lru_cache(maxsize=128) 
def cached_request(url): 
    return aiohttp.get(url) 

cached_request第一次調用工作正常,但在以後的調用我結束了None,而不是響應對象。

我對asyncio比較陌生,所以我嘗試了很多asyncio.coroutine修飾器,yield from和其他一些東西的組合,但似乎沒有任何工作。

那麼緩存協程是如何工作的呢?

+0

不知道你的緩存協同程序是什麼意思?例如將它保存爲一個變量,以便您可以重複調用它?保存結果,直到結果在稍後執行時被替換?或者稍後有相同的協程重複? – shongololo

+0

@shongololo我想緩存協程的結果。 – tobib

+1

我對functools.lru_cache()並不熟悉,但是如果您只是想返回更新後的結果,那麼是否有任何理由將更新後的結果保存到變量中?儘管如此,當使用異步方法時(比如'aiohttp.get()'),你必須用某種方法驅動它。所以cached_request必須用'@ asyncio.coroutine'封裝;它必須用''from'來調用;並且return語句應該沿'return(yield from aiohttp.get(url))'行構建' – shongololo

回答

3

我寫了一個簡單的緩存裝飾自己:

def async_cache(maxsize=128): 
    cache = {} 

    def decorator(fn): 
     def wrapper(*args):               
      key = ':'.join(args) 

      if key not in cache: 
       if len(cache) >= maxsize: 
        del cache[cache.keys().next()] 

       cache[key] = yield from fn(*args) 

      return cache[key] 

     return wrapper 

    return decorator 


@async_cache() 
@asyncio.coroutine 
def expensive_io(): 
    .... 

這種-的-作品。但許多方面可能可以改進。例如:如果在第一次調用返回之前第二次調用緩存的函數,它將再次執行。

+0

建議:使用['OrderedDict'](https://docs.python.org/3/library/collections .html#collections.OrderedDict)來實現'lru'行爲,即在每個調用的鍵上使用'OrderedDict.move_to_end',然後在緩存滿時使用'OrderedDict.popitem'。 –

0

我沒那麼熟悉aiohttp所以我不知道究竟發生了什麼,將導致返回諾內斯,但lru_cache裝飾不會與異步職能的工作。

我使用一個裝飾器,它基本上做了同樣的事情;請注意,這是不同的,以tobib的裝飾上面,因爲它總是會返回一個未來或任務,而不是值:

from collections import OrderedDict 
from functools import _make_key, wraps 

def future_lru_cache(maxsize=128): 
    # support use as decorator without calling, for this case maxsize will 
    # not be an int 
    try: 
     real_max_size = int(maxsize) 
    except ValueError: 
     real_max_size = 128 

    cache = OrderedDict() 

    async def run_and_cache(func, args, kwargs): 
     """Run func with the specified arguments and store the result 
     in cache.""" 
     result = await func(*args, **kwargs) 
     cache[_make_key(args, kwargs, False)] = result 
     if len(cache) > real_max_size: 
      cache.popitem(False) 
     return result 

    def wrapper(func): 
     @wraps(func) 
     def decorator(*args, **kwargs): 
      key = _make_key(args, kwargs, False) 
      if key in cache: 
       # Some protection against duplicating calls already in 
       # progress: when starting the call cache the future, and if 
       # the same thing is requested again return that future. 
       if isinstance(cache[key], asyncio.Future): 
        return cache[key] 
       else: 
        f = asyncio.Future() 
        f.set_result(cache[key]) 
        return f 
      else: 
       task = asyncio.Task(run_and_cache(func, args, kwargs)) 
       cache[key] = task 
       return task 
     return decorator 

    if callable(maxsize): 
     return wrapper(maxsize) 
    else: 
     return wrapper 

我用_make_key從functools爲lru_cache呢,我想這應該是私人所以可能更好地複製它。

0

LRU裝飾,其緩存尚未完成協程,擁有並行請求相同的密鑰非常有用的另一種變體:

import asyncio 
from collections import OrderedDict 
from functools import _make_key, wraps 

def async_cache(maxsize=128, event_loop=None): 
    cache = OrderedDict() 
    if event_loop is None: 
     event_loop = asyncio.get_event_loop() 
    awaiting = dict() 

    async def run_and_cache(func, args, kwargs): 
     """await func with the specified arguments and store the result 
     in cache.""" 
     result = await func(*args, **kwargs) 
     key = _make_key(args, kwargs, False) 
     cache[key] = result 
     if len(cache) > maxsize: 
      cache.popitem(False) 
     cache.move_to_end(key) 
     return result 

    def decorator(func): 
     @wraps(func) 
     async def wrapper(*args, **kwargs): 
      key = _make_key(args, kwargs, False) 
      if key in cache: 
       return cache[key] 
      if key in awaiting: 
       task = awaiting[key] 
       return await asyncio.wait_for(task, timeout=None, loop=event_loop) 
      task = asyncio.ensure_future(run_and_cache(func, args, kwargs), loop=event_loop) 
      awaiting[key] = task 
      result = await asyncio.wait_for(task, timeout=None, loop=event_loop) 
      del awaiting[key] 
      return result 
     return wrapper 

    return decorator 


async def test_async_cache(event_loop): 
    counter = 0 
    n, m = 10, 3 

    @async_cache(maxsize=n, event_loop=event_loop) 
    async def cached_function(x): 
     nonlocal counter 
     await asyncio.sleep(0) # making event loop switch to other coroutine 
     counter += 1 
     return x 

    tasks = [asyncio.ensure_future(cached_function(x), loop=event_loop) 
      for x in list(range(n)) * m] 
    done, pending = await asyncio.wait(tasks, loop=event_loop, timeout=1) 
    assert len(done) == n * m 
    assert counter == n 

event_loop = asyncio.get_event_loop() 
task = asyncio.ensure_future(test_async_cache(event_loop)) 
event_loop.run_until_complete(task) 
2

也許有點晚了,但我已經開始了一個新的軟件包,可以幫助:https://github.com/argaen/aiocache。貢獻/意見總是受歡迎的。

一個例子:

import asyncio 

from collections import namedtuple 

from aiocache import cached 
from aiocache.serializers import PickleSerializer 

Result = namedtuple('Result', "content, status") 


@cached(ttl=10, serializer=PickleSerializer()) 
async def async_main(): 
    print("First ASYNC non cached call...") 
    await asyncio.sleep(1) 
    return Result("content", 200) 


if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    print(loop.run_until_complete(async_main())) 
    print(loop.run_until_complete(async_main())) 
    print(loop.run_until_complete(async_main())) 
    print(loop.run_until_complete(async_main())) 

注意,作爲一個額外的,它可以緩存任何Python對象爲使用泡椒系列化Redis的。如果你只是想處理內存,你可以使用後端:)。

1

要將functools.lru_cache與協程一起使用,請使用以下代碼。

class Cacheable: 
    def __init__(self, co): 
     self.co = co 
     self.done = False 
     self.result = None 
     self.lock = asyncio.Lock() 

    def __await__(self): 
     with (yield from self.lock): 
      if self.done: 
       return self.result 
      self.result = yield from self.co.__await__() 
      self.done = True 
      return self.result 

def cacheable(f): 
    def wrapped(*args, **kwargs): 
     r = f(*args, **kwargs) 
     return Cacheable(r) 
    return wrapped 


@functools.lru_cache() 
@cacheable 
async def foo(): 
    async with aiohttp.ClientSession() as session: 
     async with session.get(url) as resp: 
      return await resp.text() 

以下是線程安全的

class ThreadSafeCacheable: 
    def __init__(self, co): 
     self.co = co 
     self.done = False 
     self.result = None 
     self.lock = threading.Lock() 

    def __await__(self): 
     while True: 
      if self.done: 
       return self.result 
      if self.lock.acquire(blocking=False): 
       self.result = yield from self.co.__await__() 
       self.done = True 
       return self.result 
      else: 
       yield from asyncio.sleep(0.005)