Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move execute() to its own API namespace, disambiguate #580

Merged
merged 6 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/sections/user_guide/api/execute.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
``uwtools.api.execute``
=======================

.. automodule:: uwtools.api.execute
:inherited-members:
:members:
1 change: 1 addition & 0 deletions docs/sections/user_guide/api/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ API
config
driver
esg_grid
execute
file
filter_topo
fv3
Expand Down
153 changes: 0 additions & 153 deletions src/uwtools/api/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,6 @@
API access to the ``uwtools`` driver base classes.
"""

from datetime import datetime, timedelta
from importlib import import_module
from importlib.util import module_from_spec, spec_from_file_location
from inspect import getfullargspec
from pathlib import Path
from types import ModuleType
from typing import Optional, Type, Union

from uwtools.drivers.driver import (
Assets,
AssetsCycleBased,
Expand All @@ -20,148 +12,6 @@
DriverCycleLeadtimeBased,
DriverTimeInvariant,
)
from uwtools.drivers.support import graph
from uwtools.drivers.support import tasks as _tasks
from uwtools.logging import log
from uwtools.strings import STR
from uwtools.utils.api import ensure_data_source


def execute(
module: Union[Path, str],
classname: str,
task: str,
schema_file: Optional[str] = None,
config: Optional[Union[Path, str]] = None,
cycle: Optional[datetime] = None, # pylint: disable=unused-argument
leadtime: Optional[timedelta] = None, # pylint: disable=unused-argument
batch: Optional[bool] = False, # pylint: disable=unused-argument
dry_run: Optional[bool] = False,
graph_file: Optional[Union[Path, str]] = None,
key_path: Optional[list[str]] = None,
stdin_ok: Optional[bool] = False,
) -> bool:
"""
Execute a task.

If ``batch`` is specified, a runscript will be written and submitted to the batch system.
Otherwise, the executable will be run directly on the current system.

:param module: Path to driver module or name of module on sys.path.
:param classname: Name of driver class to instantiate.
:param task: Name of driver task to execute.
:param schema_file: Path to schema file.
:param config: Path to config file (read stdin if missing or None).
:param cycle: The cycle.
:param leadtime: The leadtime.
:param batch: Submit run to the batch system?
:param dry_run: Do not run the executable, just report what would have been done.
:param graph_file: Write Graphviz DOT output here.
:param key_path: Path of keys to subsection of config file.
:param stdin_ok: OK to read from stdin?
:return: ``True`` if task completes without raising an exception.
"""
class_, module_path = _get_driver_class(module, classname)
if not class_:
return False
assert module_path is not None
args = dict(locals())
accepted = set(getfullargspec(class_).args)
non_optional = {STR.cycle, STR.leadtime}
for arg in sorted([STR.batch, *non_optional]):
if args.get(arg) and arg not in accepted:
log.error("%s does not accept argument '%s'", classname, arg)
return False
for arg in sorted(non_optional):
if arg in accepted and args[arg] is None:
log.error("%s requires argument '%s'", classname, arg)
return False
kwargs = dict(
config=ensure_data_source(config, bool(stdin_ok)),
dry_run=dry_run,
key_path=key_path,
schema_file=schema_file or module_path.with_suffix(".jsonschema"),
)
required = non_optional & accepted
for arg in sorted([STR.batch, *required]):
if arg in accepted:
kwargs[arg] = args[arg]
driverobj = class_(**kwargs)
log.debug("Instantiated %s with: %s", classname, kwargs)
getattr(driverobj, task)()
if graph_file:
with open(graph_file, "w", encoding="utf-8") as f:
print(graph(), file=f)
return True


def tasks(module: Union[Path, str], classname: str) -> dict[str, str]:
"""
Returns a mapping from task names to their one-line descriptions.

:param module: Name of driver module.
:param classname: Name of driver class to instantiate.
"""
class_, _ = _get_driver_class(module, classname)
if not class_:
log.error("Could not get tasks from class %s in module %s", classname, module)
return {}
return _tasks(class_)


def _get_driver_class(
module: Union[Path, str], classname: str
) -> tuple[Optional[Type], Optional[Path]]:
"""
Returns the driver class.

:param module: Name of driver module to load.
:param classname: Name of driver class to instantiate.
"""
if not (m := _get_driver_module_explicit(Path(module))):
if not (m := _get_driver_module_implicit(str(module))):
log.error("Could not load module %s", module)
return None, None
assert m.__file__ is not None
module_path = Path(m.__file__)
if hasattr(m, classname):
c: Type = getattr(m, classname)
return c, module_path
log.error("Module %s has no class %s", module, classname)
return None, module_path


def _get_driver_module_explicit(module: Path) -> Optional[ModuleType]:
"""
Returns the named module found via explicit lookup of given path.

:param module: Name of driver module to load.
"""
log.debug("Loading module %s", module)
if spec := spec_from_file_location(module.name, module):
m = module_from_spec(spec)
if loader := spec.loader:
try:
loader.exec_module(m)
log.debug("Loaded module %s", module)
return m
except Exception: # pylint: disable=broad-exception-caught
pass
return None


def _get_driver_module_implicit(module: str) -> Optional[ModuleType]:
"""
Returns the named module found via implicit (sys.path-based) lookup.

:param module: Name of driver module to load.
"""
log.debug("Loading module %s from sys.path", module)
try:
return import_module(module)
except Exception: # pylint: disable=broad-exception-caught
return None


__all__ = [
"Assets",
Expand All @@ -172,7 +22,4 @@ def _get_driver_module_implicit(module: str) -> Optional[ModuleType]:
"DriverCycleBased",
"DriverCycleLeadtimeBased",
"DriverTimeInvariant",
"execute",
"graph",
"tasks",
]
157 changes: 157 additions & 0 deletions src/uwtools/api/execute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
API support for interacting with external drivers.
"""

from datetime import datetime, timedelta
from importlib import import_module
from importlib.util import module_from_spec, spec_from_file_location
from inspect import getfullargspec
from pathlib import Path
from types import ModuleType
from typing import Optional, Type, Union

from uwtools.drivers.support import graph
from uwtools.drivers.support import tasks as _tasks
from uwtools.logging import log
from uwtools.strings import STR
from uwtools.utils.api import ensure_data_source


def execute(
module: Union[Path, str],
classname: str,
task: str,
schema_file: Optional[str] = None,
config: Optional[Union[Path, str]] = None,
cycle: Optional[datetime] = None, # pylint: disable=unused-argument
leadtime: Optional[timedelta] = None, # pylint: disable=unused-argument
batch: Optional[bool] = False, # pylint: disable=unused-argument
dry_run: Optional[bool] = False,
graph_file: Optional[Union[Path, str]] = None,
key_path: Optional[list[str]] = None,
stdin_ok: Optional[bool] = False,
) -> bool:
"""
Execute a driver task.

If ``batch`` is specified and the driver is instructed to run, its runscript will be configured
for and submitted to the batch system. Otherwise, the executable will be run directly on the
current system.

:param module: Path to driver module or name of module on sys.path.
:param classname: Name of driver class to instantiate.
:param task: Name of driver task to execute.
:param schema_file: Path to schema file.
:param config: Path to config file (read stdin if missing or None).
:param cycle: The cycle.
:param leadtime: The leadtime.
:param batch: Submit run to the batch system?
:param dry_run: Do not run the executable, just report what would have been done.
:param graph_file: Write Graphviz DOT output here.
:param key_path: Path of keys to subsection of config file.
:param stdin_ok: OK to read from stdin?
:return: ``True`` if task completes without raising an exception.
"""
class_, module_path = _get_driver_class(module, classname)
if not class_:
return False
assert module_path is not None
args = dict(locals())
accepted = set(getfullargspec(class_).args)
non_optional = {STR.cycle, STR.leadtime}
for arg in sorted([STR.batch, *non_optional]):
if args.get(arg) and arg not in accepted:
log.error("%s does not accept argument '%s'", classname, arg)
return False
for arg in sorted(non_optional):
if arg in accepted and args[arg] is None:
log.error("%s requires argument '%s'", classname, arg)
return False
kwargs = dict(
config=ensure_data_source(config, bool(stdin_ok)),
dry_run=dry_run,
key_path=key_path,
schema_file=schema_file or module_path.with_suffix(".jsonschema"),
)
required = non_optional & accepted
for arg in sorted([STR.batch, *required]):
if arg in accepted:
kwargs[arg] = args[arg]
driverobj = class_(**kwargs)
log.debug("Instantiated %s with: %s", classname, kwargs)
getattr(driverobj, task)()
if graph_file:
with open(graph_file, "w", encoding="utf-8") as f:
print(graph(), file=f)
return True


def tasks(module: Union[Path, str], classname: str) -> dict[str, str]:
"""
Returns a mapping from driver task names to their one-line descriptions.

:param module: Name of driver module.
:param classname: Name of driver class to instantiate.
"""
class_, _ = _get_driver_class(module, classname)
if not class_:
log.error("Could not get tasks from class %s in module %s", classname, module)
return {}
return _tasks(class_)


def _get_driver_class(
module: Union[Path, str], classname: str
) -> tuple[Optional[Type], Optional[Path]]:
"""
Returns the driver class.

:param module: Name of driver module to load.
:param classname: Name of driver class to instantiate.
"""
if not (m := _get_driver_module_explicit(Path(module))):
if not (m := _get_driver_module_implicit(str(module))):
log.error("Could not load module %s", module)
return None, None
assert m.__file__ is not None
module_path = Path(m.__file__)
if hasattr(m, classname):
c: Type = getattr(m, classname)
return c, module_path
log.error("Module %s has no class %s", module, classname)
return None, module_path


def _get_driver_module_explicit(module: Path) -> Optional[ModuleType]:
"""
Returns the named module found via explicit lookup of given path.

:param module: Name of driver module to load.
"""
log.debug("Loading module %s", module)
if spec := spec_from_file_location(module.name, module):
m = module_from_spec(spec)
if loader := spec.loader:
try:
loader.exec_module(m)
log.debug("Loaded module %s", module)
return m
except Exception: # pylint: disable=broad-exception-caught
pass
return None


def _get_driver_module_implicit(module: str) -> Optional[ModuleType]:
"""
Returns the named module found via implicit (sys.path-based) lookup.

:param module: Name of driver module to load.
"""
log.debug("Loading module %s from sys.path", module)
try:
return import_module(module)
except Exception: # pylint: disable=broad-exception-caught
return None


__all__ = ["execute", "graph", "tasks"]
3 changes: 2 additions & 1 deletion src/uwtools/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import uwtools.api
import uwtools.api.config
import uwtools.api.driver
import uwtools.api.execute
import uwtools.api.file
import uwtools.api.rocoto
import uwtools.api.template
Expand Down Expand Up @@ -288,7 +289,7 @@ def _dispatch_execute(args: Args) -> bool:

:param args: Parsed command-line args.
"""
return uwtools.api.driver.execute(
return uwtools.api.execute.execute(
classname=args[STR.classname],
module=args[STR.module],
task=args[STR.task],
Expand Down
4 changes: 2 additions & 2 deletions src/uwtools/drivers/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from uwtools.scheduler import JobScheduler
from uwtools.strings import STR
from uwtools.utils.file import writable
from uwtools.utils.processing import execute
from uwtools.utils.processing import shellcmd
maddenp-noaa marked this conversation as resolved.
Show resolved Hide resolved

# NB: Class docstrings are programmatically defined.

Expand Down Expand Up @@ -380,7 +380,7 @@ def _run_via_local_execution(self):
yield asset(path, path.is_file)
yield self.provisioned_rundir()
cmd = "{x} >{x}.out 2>&1".format(x=self._runscript_path)
execute(cmd=cmd, cwd=self.rundir, log_output=True)
shellcmd(cmd=cmd, cwd=self.rundir, log_output=True)

# Private helper methods

Expand Down
4 changes: 2 additions & 2 deletions src/uwtools/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from uwtools.exceptions import UWConfigError
from uwtools.logging import log
from uwtools.strings import STR
from uwtools.utils.processing import execute
from uwtools.utils.processing import shellcmd


class JobScheduler(ABC):
Expand Down Expand Up @@ -80,7 +80,7 @@ def submit_job(self, runscript: Path, submit_file: Optional[Path] = None) -> boo
cmd = f"{self._submit_cmd} {runscript}"
if submit_file:
cmd += " 2>&1 | tee %s" % submit_file
success, _ = execute(cmd=cmd, cwd=f"{runscript.parent}")
success, _ = shellcmd(cmd=cmd, cwd=f"{runscript.parent}")
return success

# Private methods
Expand Down
Loading
Loading