Source code for speasy.core.cache.cache

from typing import Union
from datetime import datetime, timezone, timedelta

import diskcache as dc
from .version import str_to_version, version_to_str, Version
from speasy.config import cache as cache_cfg
from contextlib import ExitStack, contextmanager
import re
import logging

cache_version = str_to_version("3.0")

log = logging.getLogger(__name__)


[docs] class CacheItem: def __init__(self, data, version, lifetime=None): self.data = data self.version = version if lifetime is not None and isinstance(lifetime, (float, int)): lifetime = timedelta(seconds=lifetime) self.lifetime = lifetime self.created = datetime.now(tz=timezone.utc)
[docs] def bump_creation_time(self) -> "CacheItem": self.created = datetime.now(tz=timezone.utc) return self
def __setstate__(self, state): self.data = state["data"] self.version = state["version"] self.lifetime = state.get("lifetime", None) self.created = state.get("created", datetime.now(tz=timezone.utc))
[docs] def is_expired(self) -> bool: if isinstance(self.lifetime, timedelta): return datetime.now(tz=timezone.utc) > (self.created + self.lifetime) else: return False
[docs] class Cache: __slots__ = ['cache_file', '_data', 'cache_type'] def __init__(self, cache_path: str = "", cache_type='Cache'): cache_path = f"{cache_path}/{cache_type}" if cache_type == 'Fanout': self._data = dc.FanoutCache(cache_path, shards=8, size_limit=cache_cfg.size()) elif cache_type == 'Cache': self._data = dc.Cache(cache_path, size_limit=cache_cfg.size()) else: raise ValueError(f"Unimplemented cache type: {cache_type}") self.cache_type = cache_type self._data.stats(enable=True, reset=True) if self.version < cache_version: self._data.clear() self.version = cache_version @property def version(self): return str_to_version(self._data.get("cache/version", default="0.0.0")) @version.setter def version(self, v: Union[str, Version]): self._data["cache/version"] = v if type(v) is str else version_to_str(v)
[docs] def disk_size(self): return self._data.volume()
[docs] def stats(self): hit, miss = self._data.stats() return { "hit": hit, "misses": miss, }
def __len__(self): return len(self._data) def __del__(self): self._data.close()
[docs] def keys(self): return list(self._data)
def __contains__(self, item): return item in self._data def __getitem__(self, key): return self._data[key] def __setitem__(self, key, value): self._data[key] = value
[docs] def set(self, key, value, expire=None): self._data.set(key, value, expire=expire)
[docs] def add(self, key, value, expire=None): return self._data.add(key, value, expire=expire)
[docs] def get(self, key, default_value=None): return self._data.get(key, default_value)
[docs] def incr(self, key, delta=1, default=0): return self._data.incr(key, delta, default=default)
[docs] def drop(self, key): self._data.delete(key)
[docs] def drop_matching_entries(self, pattern: Union[str, re.Pattern]): """Drop all cache entries that match a given pattern Parameters ---------- pattern : str or re.Pattern The pattern to match cache keys against """ if isinstance(pattern, str): pattern = re.compile(pattern) for key in filter(pattern.match, self.keys()): log.debug(f"Dropping cache entry {key}") self.drop(key)
[docs] @contextmanager def transact(self): if self.cache_type != 'Fanout': with self._data.transact(): yield else: with ExitStack(): yield
[docs] @contextmanager def lock(self, key: str): lock = dc.Lock(self._data, key) lock.acquire() try: yield lock finally: lock.release()