From 09d9f68ca989970ea1f418250ebd48eea57792ec Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Fri, 8 Dec 2023 20:41:17 +0100 Subject: [PATCH] [fix] Also start pub sub listener/publisher --- fixcloudutils/redis/cache.py | 13 +++++++++---- pyproject.toml | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/fixcloudutils/redis/cache.py b/fixcloudutils/redis/cache.py index 4006bed..7ce9b8e 100644 --- a/fixcloudutils/redis/cache.py +++ b/fixcloudutils/redis/cache.py @@ -87,6 +87,7 @@ def __init__( self.local_cache: dict[Any, RedisCacheEntry] = {} self.cached_functions: dict[str, Any] = {} self.cleaner_task = Periodic("wipe_local_cache", self._wipe_outdated_from_local_cache, cleaner_task_frequency) + assert self.ttl_memory < self.ttl_redis, "ttl_memory must be smaller than ttl_redis" async def start(self) -> None: if self.started: @@ -94,18 +95,22 @@ async def start(self) -> None: self.started = True self.should_run = True await self.cleaner_task.start() + await self.event_listener.start() + await self.event_publisher.start() self.process_queue_task = create_task(self._process_queue()) async def stop(self) -> None: if not self.started: return self.should_run = False + await self.event_publisher.stop() + await self.event_listener.stop() await stop_running_task(self.process_queue_task) await self.cleaner_task.stop() self.started = False async def evict(self, key: str) -> None: - log.info(f"{self.key}: Evict {key}") + log.debug(f"{self.key}: Evict {key}") await self.queue.put(RedisCacheEvict(self._redis_key(key))) def call( @@ -168,9 +173,9 @@ async def _process_queue(self) -> None: await self.redis.hset(name=entry.key, key=entry.fn_key, value=value) # type: ignore await self.redis.expire(name=entry.key, time=entry.ttl) elif isinstance(entry, RedisCacheEvict): - log.info(f"{self.key}: Delete cached value from redis key {entry.key}") # delete the entry - await self.redis.delete(entry.key) + if (await self.redis.delete(entry.key)) > 0: + log.info(f"{self.key}: Deleted cached value from redis key {entry.key}") # inform all other cache instances to evict the key await self.event_publisher.publish("evict", {"redis_key": entry.key}) # delete from local cache @@ -186,7 +191,7 @@ async def _handle_evict_message(self, uid: str, at: datetime, publisher: str, ki """ PubSub listener for evict messages. """ - log.info(f"Received message: {kind} {data} from {publisher} at {at} by {uid}") + log.debug(f"Received message: {kind} {data} from {publisher} at {at} by {uid}") if kind == "evict" and (redis_key := data.get("redis_key")): # delete from local cache self._remove_from_local_cache(redis_key) diff --git a/pyproject.toml b/pyproject.toml index aa8486f..6e3c70f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "fixcloudutils" -version = "1.13.0" +version = "1.13.1" authors = [{ name = "Some Engineering Inc." }] description = "Utilities for fixcloud." license = { file = "LICENSE" }