Skip to content

Commit

Permalink
dev
Browse files Browse the repository at this point in the history
  • Loading branch information
davidhassell committed Aug 6, 2024
1 parent 57561a0 commit 581648d
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 68 deletions.
2 changes: 1 addition & 1 deletion cf/data/collapse/collapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Collapse(metaclass=DocstringRewriteMeta):
data then it could even be faster and require less
energy to do non-active operation of the local client.
The performance improvements from using active storage
See `cf.data.collapse.collapse_active.actify` and
`cf.data.collapse.collapse_active.active_chunk_function` for
further details.
Expand Down
128 changes: 63 additions & 65 deletions cf/data/collapse/collapse_active.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,68 @@ class ActiveStorageError(Exception):
pass


def actify(method):
"""Decorator for active storage reductions on chunks.
Intended for to decorate the ``cf_*_chunk`` methods in
cf.data.collapse.dask_collapse`.
When a ``cf_*_chunk`` method is decorated, its computations will
be attempted in active storage. If that is not possible (due to
configuration settings, limitations on the type of reduction that
can be done in active storage, or the active storage reduction
failed) then the computations will be done locally "as usual".
.. versionadded:: NEXTVERSION
.. seealso:: `active_chunk_function`
:Parameters:
method: `str`
The name of the reduction method.
"""

def decorator(chunk_function):
@wraps(chunk_function)
def wrapper(*args, **kwargs):
try:
# Try doing an active storage reduction
out = active_chunk_function(method, *args, **kwargs)
except ActiveStorageError as error:
# The active storage reduction failed
logger.warning(
f"{error} => reverting to local computation"
) # pragma: no cover
else:
if out is not None:
# The active storage reduction succeeded
return out

# Still here? Then using active storage was not
# appropriate, or else doing the active storage operation
# failed => do a local computation.
return chunk_function(*args, **kwargs)

return wrapper

return decorator


def active_chunk_function(method, *args, **kwargs):
"""Collapse data in a chunk with active storage.
Called by the `actify` decorator function.
If an active storage reduction is not approriate then `None` is
If an active storage reduction is not appropriate then `None` is
returned.
If the active storage operation fails then ActiveStorageError is
raised.
If the active storage operation fails then an ActiveStorageError
is raised.
If the active storage operation is successful then a dictionary of
redcution components, similar to that returned by a ``cf_*_chunk``
reduction components, similar to that returned by a ``cf_*_chunk``
method, is returned.
.. versionadded:: NEXTVERSION
Expand All @@ -63,7 +112,7 @@ def active_chunk_function(method, *args, **kwargs):
`dict` or `None`
The reduced data in component form, or `None` if an active
storage reduction is not approriate.
storage reduction is not appropriate.
**Examples**
Expand Down Expand Up @@ -150,15 +199,16 @@ def active_chunk_function(method, *args, **kwargs):
}

index = x.index()


details = (
f"{method!r} (file={filename}, address={address}, url={url}, "
f"Dask chunk={index})"
)

info = is_log_level_info(logger)
if info:
# Do some detailed logging
start = time.time()
details = (
f"{method!r} (file={filename}, address={address}, url={url}, "
f"chunk={index})"
)
logger.info(
f"STARTED active storage {details}: {datetime.datetime.now()}"
) # pragma: no cover
Expand All @@ -178,14 +228,14 @@ def active_chunk_function(method, *args, **kwargs):
d = active[index]
except Exception as error:
# Something went wrong with the active storage operations =>
# Raise an ActiveStorageError that will in tuen trigger
# Raise an ActiveStorageError that will in turn trigger
# (inside `actify`) a local reduction to be carried out
# instead.
raise ActiveStorageError(
f"FAILED in active storage {details} ({error}))"
)
else:
# Active storage reduction was successful
# Active storage reduction was successful
if info:
# Do some detailed logging
try:
Expand All @@ -205,7 +255,7 @@ def active_chunk_function(method, *args, **kwargs):
f"selection 2 (s): {md['selection 2 time (s)']:6.2f}, "
f"Total: {(time.time() - start):6.2f}s"
) # pragma: no cover

# ----------------------------------------------------------------
# Active storage reduction was a success. Reformat the resulting
# components dictionary 'd' to match the output of the
Expand All @@ -225,55 +275,3 @@ def active_chunk_function(method, *args, **kwargs):
d = {"N": d["n"], "sum": d["sum"]}

return d


# --------------------------------------------------------------------
# Decorators
# --------------------------------------------------------------------
def actify(method):
"""Decorator for active storage reductions on chunks.
Intended for to decorate the ``cf_*_chunk`` methods in
cf.data.collapse.dask_collapse`.
When a ``cf_*_chunk`` method is decorated, its computations will
be attempted in active storage. If that is not possible (due to
configuration settings, limitations on the type of reduction that
can be done in active storage, or the active storage reduction
failed) then the computations will be done locally "as usual".
.. versionadded:: NEXTVERSION
.. seealso:: `active_chunk_function`
:Parameters:
method: `str`
The name of the reduction method.
"""

def decorator(chunk_function):
@wraps(chunk_function)
def wrapper(*args, **kwargs):
try:
# Try doing an active storage reduction
out = active_chunk_function(method, *args, **kwargs)
except ActiveStorageError as error:
# The active storage reduction failed
logger.warning(
f"{error} => reverting to local computation"
) # pragma: no cover
else:
if out is not None:
# The active storage reduction succeeded
return out

# Still here? Then using active storage is not
# appropriate, or else doing the active storage operation
# failed => do a local computation.
return chunk_function(*args, **kwargs)

return wrapper

return decorator
4 changes: 2 additions & 2 deletions cf/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,7 @@ def _parse(cls, arg):
"""
try:
from activestorage import Active
from activestorage import Active # noqa: F401
except ModuleNotFoundError as error:
if arg:
raise ModuleNotFoundError(
Expand Down Expand Up @@ -1358,7 +1358,7 @@ class active_storage_max_requests(ConstantAccess):
This formula only applies to cases where all `dask` chunks for the
collapse operation are utilising active storage. If some are not
then :math:`N` will likely be underestimated.
.. versionadded:: NEXTVERSION
.. seealso:: `active_storage`, `active_storage_url`,
Expand Down

0 comments on commit 581648d

Please sign in to comment.