Skip to content

Commit

Permalink
Add ability to configure standard datastore cache to be disabled
Browse files Browse the repository at this point in the history
This allows code to use the normal cache with disabled configruation
but allow external environment variables to turn it on. This is
not possible if the code is using an explicit disabled cache
manager class.
  • Loading branch information
timj committed Aug 19, 2024
1 parent 114fc36 commit 392dec6
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ datastore:
# Expiry mode and associated threshold.
# Options are:
# - null (no expiry)
# - disabled (no caching)
# - files (threshold is number of files)
# - datasets (threshold is number of datasets)
# - size (threshold is size in bytes)
Expand Down
87 changes: 64 additions & 23 deletions python/lsst/daf/butler/datastore/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,54 @@ class DatastoreCacheManager(AbstractDatastoreCacheManager):
def __init__(self, config: str | DatastoreCacheManagerConfig, universe: DimensionUniverse):
super().__init__(config, universe)

# Expiration mode. Read from config but allow override from
# the environment.
expiration_mode = self.config.get(("expiry", "mode"))
threshold = self.config.get(("expiry", "threshold"))

external_mode = os.environ.get("DAF_BUTLER_CACHE_EXPIRATION_MODE")
if external_mode:
if external_mode == "disabled":
expiration_mode = external_mode
threshold = 0

Check warning on line 492 in python/lsst/daf/butler/datastore/cache_manager.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastore/cache_manager.py#L491-L492

Added lines #L491 - L492 were not covered by tests
elif "=" in external_mode:
expiration_mode, expiration_threshold = external_mode.split("=", 1)
threshold = int(expiration_threshold)

Check warning on line 495 in python/lsst/daf/butler/datastore/cache_manager.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastore/cache_manager.py#L494-L495

Added lines #L494 - L495 were not covered by tests
else:
log.warning(

Check warning on line 497 in python/lsst/daf/butler/datastore/cache_manager.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastore/cache_manager.py#L497

Added line #L497 was not covered by tests
"Unrecognized form (%s) for DAF_BUTLER_CACHE_EXPIRATION_MODE environment variable. "
"Ignoring.",
external_mode,
)
if expiration_mode is None:
# Force to None to avoid confusion.
threshold = None

allowed = ("disabled", "datasets", "age", "size", "files")
if expiration_mode and expiration_mode not in allowed:
raise ValueError(

Check warning on line 508 in python/lsst/daf/butler/datastore/cache_manager.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastore/cache_manager.py#L508

Added line #L508 was not covered by tests
f"Unrecognized value for cache expiration mode. Got {expiration_mode} but expected "
+ ",".join(allowed)
)

self._expiration_mode: str | None = expiration_mode
self._expiration_threshold: int | None = threshold
if self._expiration_threshold is None and self._expiration_mode is not None:
raise ValueError(

Check warning on line 516 in python/lsst/daf/butler/datastore/cache_manager.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastore/cache_manager.py#L516

Added line #L516 was not covered by tests
f"Cache expiration threshold must be set for expiration mode {self._expiration_mode}"
)

# Files in cache, indexed by path within the cache directory.
self._cache_entries = CacheRegistry()

# No need for additional configuration if the cache has been disabled.
if self._expiration_mode == "disabled":
log.debug("Cache configured in disabled state.")
self._cache_directory: ResourcePath | None = ResourcePath(
"datastore_cache_disabled", forceAbsolute=False, forceDirectory=True
)
return

# Set cache directory if it pre-exists, else defer creation until
# requested. Allow external override from environment.
root = os.environ.get("DAF_BUTLER_CACHE_DIRECTORY") or self.config.get("root")
Expand Down Expand Up @@ -510,35 +558,12 @@ def __init__(self, config: str | DatastoreCacheManagerConfig, universe: Dimensio
# Default decision to for whether a dataset should be cached.
self._caching_default = self.config.get("default", False)

# Expiration mode. Read from config but allow override from
# the environment.
expiration_mode = self.config.get(("expiry", "mode"))
threshold = self.config.get(("expiry", "threshold"))

external_mode = os.environ.get("DAF_BUTLER_CACHE_EXPIRATION_MODE")
if external_mode and "=" in external_mode:
expiration_mode, expiration_threshold = external_mode.split("=", 1)
threshold = int(expiration_threshold)
if expiration_mode is None:
# Force to None to avoid confusion.
threshold = None

self._expiration_mode: str | None = expiration_mode
self._expiration_threshold: int | None = threshold
if self._expiration_threshold is None and self._expiration_mode is not None:
raise ValueError(
f"Cache expiration threshold must be set for expiration mode {self._expiration_mode}"
)

log.debug(
"Cache configuration:\n- root: %s\n- expiration mode: %s",
self._cache_directory if self._cache_directory else "tmpdir",
f"{self._expiration_mode}={self._expiration_threshold}" if self._expiration_mode else "disabled",
)

# Files in cache, indexed by path within the cache directory.
self._cache_entries = CacheRegistry()

@property
def cache_directory(self) -> ResourcePath:
if self._cache_directory is None:
Expand Down Expand Up @@ -625,6 +650,9 @@ def set_fallback_cache_directory_if_unset(cls) -> tuple[bool, str]:

def should_be_cached(self, entity: DatasetRef | DatasetType | StorageClass) -> bool:
# Docstring inherited
if self._expiration_mode == "disabled":
return False

matchName: LookupKey | str = f"{entity} (via default)"
should_cache = self._caching_default

Expand Down Expand Up @@ -662,6 +690,8 @@ def _construct_cache_name(self, ref: DatasetRef, extension: str) -> ResourcePath

def move_to_cache(self, uri: ResourcePath, ref: DatasetRef) -> ResourcePath | None:
# Docstring inherited
if self._expiration_mode == "disabled":
return None
if not self.should_be_cached(ref):
return None

Expand Down Expand Up @@ -696,6 +726,10 @@ def move_to_cache(self, uri: ResourcePath, ref: DatasetRef) -> ResourcePath | No
@contextlib.contextmanager
def find_in_cache(self, ref: DatasetRef, extension: str) -> Iterator[ResourcePath | None]:
# Docstring inherited
if self._expiration_mode == "disabled":
yield None
return

# Short circuit this if the cache directory has not been created yet.
if self._cache_directory is None:
yield None
Expand Down Expand Up @@ -810,6 +844,9 @@ def _register_cache_entry(self, cached_location: ResourcePath, can_exist: bool =

def scan_cache(self) -> None:
"""Scan the cache directory and record information about files."""
if self._expiration_mode == "disabled":
return

Check warning on line 848 in python/lsst/daf/butler/datastore/cache_manager.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/datastore/cache_manager.py#L848

Added line #L848 was not covered by tests

found = set()
for file in ResourcePath.findFileResources([self.cache_directory]):
assert isinstance(file, ResourcePath), "Unexpectedly did not get ResourcePath from iterator"
Expand Down Expand Up @@ -865,6 +902,8 @@ def known_to_cache(self, ref: DatasetRef, extension: str | None = None) -> bool:
This method does not force the cache to be re-scanned and so can miss
cached datasets that have recently been written by other processes.
"""
if self._expiration_mode == "disabled":
return False
if self._cache_directory is None:
return False
if self.file_count == 0:
Expand Down Expand Up @@ -1002,6 +1041,8 @@ def _sort_by_time(key: str) -> datetime.datetime:
return sorted(self._cache_entries, key=_sort_by_time)

def __str__(self) -> str:
if self._expiration_mode == "disabled":
return f"""{type(self).__name__}(disabled)"""
cachedir = self._cache_directory if self._cache_directory else "<tempdir>"
return (
f"{type(self).__name__}@{cachedir} ({self._expiration_mode}={self._expiration_threshold},"
Expand Down
12 changes: 12 additions & 0 deletions tests/test_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1896,6 +1896,18 @@ def testCacheExpirySize(self) -> None:
self.assertExpiration(cache_manager, 10, 6)
self.assertIn(f"{mode}={threshold}", str(cache_manager))

def testDisabledCache(self) -> None:
threshold = 0
mode = "disabled"
config_str = self._expiration_config(mode, threshold)
cache_manager = self._make_cache_manager(config_str)
for uri, ref in zip(self.files, self.refs, strict=True):
self.assertFalse(cache_manager.should_be_cached(ref))
self.assertIsNone(cache_manager.move_to_cache(uri, ref))
self.assertFalse(cache_manager.known_to_cache(ref))
with cache_manager.find_in_cache(ref, ".txt") as found:
self.assertIsNone(found, msg=f"{cache_manager}")

def assertExpiration(
self, cache_manager: DatastoreCacheManager, n_datasets: int, n_retained: int
) -> None:
Expand Down

0 comments on commit 392dec6

Please sign in to comment.