Skip to content

Commit

Permalink
[fix] Also start pub sub listener/publisher (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias authored Dec 8, 2023
1 parent 0a3cd8b commit 78237a9
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
13 changes: 9 additions & 4 deletions fixcloudutils/redis/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,30 @@ def __init__(
self.local_cache: dict[Any, RedisCacheEntry] = {}
self.cached_functions: dict[str, Any] = {}
self.cleaner_task = Periodic("wipe_local_cache", self._wipe_outdated_from_local_cache, cleaner_task_frequency)
assert self.ttl_memory < self.ttl_redis, "ttl_memory must be smaller than ttl_redis"

async def start(self) -> None:
if self.started:
return
self.started = True
self.should_run = True
await self.cleaner_task.start()
await self.event_listener.start()
await self.event_publisher.start()
self.process_queue_task = create_task(self._process_queue())

async def stop(self) -> None:
if not self.started:
return
self.should_run = False
await self.event_publisher.stop()
await self.event_listener.stop()
await stop_running_task(self.process_queue_task)
await self.cleaner_task.stop()
self.started = False

async def evict(self, key: str) -> None:
log.info(f"{self.key}: Evict {key}")
log.debug(f"{self.key}: Evict {key}")
await self.queue.put(RedisCacheEvict(self._redis_key(key)))

def call(
Expand Down Expand Up @@ -168,9 +173,9 @@ async def _process_queue(self) -> None:
await self.redis.hset(name=entry.key, key=entry.fn_key, value=value) # type: ignore
await self.redis.expire(name=entry.key, time=entry.ttl)
elif isinstance(entry, RedisCacheEvict):
log.info(f"{self.key}: Delete cached value from redis key {entry.key}")
# delete the entry
await self.redis.delete(entry.key)
if (await self.redis.delete(entry.key)) > 0:
log.info(f"{self.key}: Deleted cached value from redis key {entry.key}")
# inform all other cache instances to evict the key
await self.event_publisher.publish("evict", {"redis_key": entry.key})
# delete from local cache
Expand All @@ -186,7 +191,7 @@ async def _handle_evict_message(self, uid: str, at: datetime, publisher: str, ki
"""
PubSub listener for evict messages.
"""
log.info(f"Received message: {kind} {data} from {publisher} at {at} by {uid}")
log.debug(f"Received message: {kind} {data} from {publisher} at {at} by {uid}")
if kind == "evict" and (redis_key := data.get("redis_key")):
# delete from local cache
self._remove_from_local_cache(redis_key)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "fixcloudutils"
version = "1.13.0"
version = "1.13.1"
authors = [{ name = "Some Engineering Inc." }]
description = "Utilities for fixcloud."
license = { file = "LICENSE" }
Expand Down

0 comments on commit 78237a9

Please sign in to comment.