2015-12-06 11 views
7

sto usando aiohttp fare una semplice richiesta HTTP in Python 3.4 come questo:come memorizzare nella cache asyncio coroutine

response = yield from aiohttp.get(url) 

L'applicazione richiede lo stesso URL più e più volte così naturalmente ho voluto mettere in cache esso. Il mio primo tentativo è stato qualcosa di simile:

@functools.lru_cache(maxsize=128) 
def cached_request(url): 
    return aiohttp.get(url) 

La prima chiamata a cached_request funziona bene, ma nelle chiamate successive io alla fine con None anziché l'oggetto di risposta.

Sono piuttosto nuovo ad asyncio quindi ho provato molte combinazioni del decoratore asyncio.coroutine, yield from e alcune altre cose, ma nessuna sembrava funzionare.

Quindi, come funziona la cache delle coroutine?

+0

Non sei sicuro di cosa si intende per il caching un coroutine? per esempio. Salvalo come variabile in modo che tu possa chiamarlo ripetutamente? Salva il risultato, finché il risultato non viene sostituito in un'esecuzione successiva? Oppure ripetere la stessa coroutine in un secondo momento? – shongololo

+0

@shongololo Voglio memorizzare nella cache il risultato della coroutine. – tobib

+1

Non ho familiarità con functools.lru_cache() ma se si desidera semplicemente restituire risultati aggiornati, c'è qualche ragione per cui non si limitano a salvare i risultati aggiornati in una variabile? Tuttavia, quando si utilizza un metodo asincrono (come 'aiohttp.get()') devi guidarlo con qualcosa. Quindi la richiesta in cache deve essere inclusa con '@ asyncio.coroutine'; deve essere chiamato usando 'yield from'; e l'istruzione return dovrebbe essere incorniciata lungo le linee di 'return (yield from aiohttp.get (url))' – shongololo

risposta

3

ho scritto un semplice cache di decoratore di me stesso:

def async_cache(maxsize=128): 
    cache = {} 

    def decorator(fn): 
     def wrapper(*args):               
      key = ':'.join(args) 

      if key not in cache: 
       if len(cache) >= maxsize: 
        del cache[cache.keys().next()] 

       cache[key] = yield from fn(*args) 

      return cache[key] 

     return wrapper 

    return decorator 


@async_cache() 
@asyncio.coroutine 
def expensive_io(): 
    .... 

Questo tipo-di-opere. Ma molti aspetti possono probabilmente essere migliorati. Ad esempio: se la funzione memorizzata nella cache viene richiamata una seconda volta prima che la prima chiamata torni, verrà eseguita una seconda volta.

+0

Suggerimento: usa un ['OrderedDict'] (https://docs.python.org/3/library/collections .html # collections.OrderedDict) per implementare il comportamento 'lru', ovvero utilizzare' OrderedDict.move_to_end' su ogni chiave chiamata e quindi 'OrderedDict.popitem' quando la cache è piena. –

0

Non sono così familiare con aiohttp, quindi non sono sicuro di esattamente cosa sta succedendo che provocherebbe la restituzione di Nones, ma il decoratore lru_cache non funzionerà con le funzioni asincrone.

Io uso un decoratore che fa essenzialmente la stessa cosa; notare che è diverso da decoratore di tobib sopra in quanto restituirà sempre un futuro o un'attività, piuttosto che il valore:

from collections import OrderedDict 
from functools import _make_key, wraps 

def future_lru_cache(maxsize=128): 
    # support use as decorator without calling, for this case maxsize will 
    # not be an int 
    try: 
     real_max_size = int(maxsize) 
    except ValueError: 
     real_max_size = 128 

    cache = OrderedDict() 

    async def run_and_cache(func, args, kwargs): 
     """Run func with the specified arguments and store the result 
     in cache.""" 
     result = await func(*args, **kwargs) 
     cache[_make_key(args, kwargs, False)] = result 
     if len(cache) > real_max_size: 
      cache.popitem(False) 
     return result 

    def wrapper(func): 
     @wraps(func) 
     def decorator(*args, **kwargs): 
      key = _make_key(args, kwargs, False) 
      if key in cache: 
       # Some protection against duplicating calls already in 
       # progress: when starting the call cache the future, and if 
       # the same thing is requested again return that future. 
       if isinstance(cache[key], asyncio.Future): 
        return cache[key] 
       else: 
        f = asyncio.Future() 
        f.set_result(cache[key]) 
        return f 
      else: 
       task = asyncio.Task(run_and_cache(func, args, kwargs)) 
       cache[key] = task 
       return task 
     return decorator 

    if callable(maxsize): 
     return wrapper(maxsize) 
    else: 
     return wrapper 

ho usato _make_key da functools come lru_cache fa, credo che dovrebbe essere privato così probabilmente meglio copiarlo.

0

Un'altra variante di LRU decoratore, che memorizza nella cache non ancora coroutine finiti, molto utile con le richieste parallele dello stesso tasto:

import asyncio 
from collections import OrderedDict 
from functools import _make_key, wraps 

def async_cache(maxsize=128, event_loop=None): 
    cache = OrderedDict() 
    if event_loop is None: 
     event_loop = asyncio.get_event_loop() 
    awaiting = dict() 

    async def run_and_cache(func, args, kwargs): 
     """await func with the specified arguments and store the result 
     in cache.""" 
     result = await func(*args, **kwargs) 
     key = _make_key(args, kwargs, False) 
     cache[key] = result 
     if len(cache) > maxsize: 
      cache.popitem(False) 
     cache.move_to_end(key) 
     return result 

    def decorator(func): 
     @wraps(func) 
     async def wrapper(*args, **kwargs): 
      key = _make_key(args, kwargs, False) 
      if key in cache: 
       return cache[key] 
      if key in awaiting: 
       task = awaiting[key] 
       return await asyncio.wait_for(task, timeout=None, loop=event_loop) 
      task = asyncio.ensure_future(run_and_cache(func, args, kwargs), loop=event_loop) 
      awaiting[key] = task 
      result = await asyncio.wait_for(task, timeout=None, loop=event_loop) 
      del awaiting[key] 
      return result 
     return wrapper 

    return decorator 


async def test_async_cache(event_loop): 
    counter = 0 
    n, m = 10, 3 

    @async_cache(maxsize=n, event_loop=event_loop) 
    async def cached_function(x): 
     nonlocal counter 
     await asyncio.sleep(0) # making event loop switch to other coroutine 
     counter += 1 
     return x 

    tasks = [asyncio.ensure_future(cached_function(x), loop=event_loop) 
      for x in list(range(n)) * m] 
    done, pending = await asyncio.wait(tasks, loop=event_loop, timeout=1) 
    assert len(done) == n * m 
    assert counter == n 

event_loop = asyncio.get_event_loop() 
task = asyncio.ensure_future(test_async_cache(event_loop)) 
event_loop.run_until_complete(task) 
2

Forse un po 'tardi, ma ho iniziato un nuovo pacchetto che può aiutare : https://github.com/argaen/aiocache. Contributi/commenti sono sempre ben accetti.

Un esempio:

import asyncio 

from collections import namedtuple 

from aiocache import cached 
from aiocache.serializers import PickleSerializer 

Result = namedtuple('Result', "content, status") 


@cached(ttl=10, serializer=PickleSerializer()) 
async def async_main(): 
    print("First ASYNC non cached call...") 
    await asyncio.sleep(1) 
    return Result("content", 200) 


if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    print(loop.run_until_complete(async_main())) 
    print(loop.run_until_complete(async_main())) 
    print(loop.run_until_complete(async_main())) 
    print(loop.run_until_complete(async_main())) 

Si noti che come un extra, è possibile memorizzare nella cache ogni oggetto Python in Redis utilizzando Pickle serializzazione. Nel caso in cui si desideri lavorare con la memoria, è possibile utilizzare il backend SimpleMemoryCache :).

1

Per utilizzare functools.lru_cache con le coroutine, il seguente codice funziona.

class Cacheable: 
    def __init__(self, co): 
     self.co = co 
     self.done = False 
     self.result = None 
     self.lock = asyncio.Lock() 

    def __await__(self): 
     with (yield from self.lock): 
      if self.done: 
       return self.result 
      self.result = yield from self.co.__await__() 
      self.done = True 
      return self.result 

def cacheable(f): 
    def wrapped(*args, **kwargs): 
     r = f(*args, **kwargs) 
     return Cacheable(r) 
    return wrapped 


@functools.lru_cache() 
@cacheable 
async def foo(): 
    async with aiohttp.ClientSession() as session: 
     async with session.get(url) as resp: 
      return await resp.text() 

Di seguito è thread-safe

class ThreadSafeCacheable: 
    def __init__(self, co): 
     self.co = co 
     self.done = False 
     self.result = None 
     self.lock = threading.Lock() 

    def __await__(self): 
     while True: 
      if self.done: 
       return self.result 
      if self.lock.acquire(blocking=False): 
       self.result = yield from self.co.__await__() 
       self.done = True 
       return self.result 
      else: 
       yield from asyncio.sleep(0.005) 
Problemi correlati