From eaa129b598436dd7c517e9a0e11c6f90cdffcad3 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 29 Nov 2022 01:27:09 -0500 Subject: [PATCH] add ResourceRequirements tests [partial] (relates to https://github.com/crim-ca/weaver/issues/138) + ensure big job log messages are clipped --- CHANGES.rst | 3 +- .../application-packages/ReadFile/deploy.yml | 2 +- .../SimulateResourceUsage/deploy.yml | 15 +++ .../SimulateResourceUsage/job.yml | 4 + .../SimulateResourceUsage/package.cwl | 45 ++++++++ tests/functional/test_wps_package.py | 104 +++++++++++++++++- weaver/datatype.py | 72 ++++++++---- weaver/processes/wps_package.py | 3 +- weaver/store/mongodb.py | 2 +- 9 files changed, 221 insertions(+), 29 deletions(-) create mode 100644 tests/functional/application-packages/SimulateResourceUsage/deploy.yml create mode 100644 tests/functional/application-packages/SimulateResourceUsage/job.yml create mode 100644 tests/functional/application-packages/SimulateResourceUsage/package.cwl diff --git a/CHANGES.rst b/CHANGES.rst index 3e2f84183..39d136c54 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,6 +12,8 @@ Changes Changes: -------- +- Add `Job` log message size checks to better control what gets logged during the `Application Package` execution to + avoid large documents causing problems when attempting save them to storage database. - Update documentation with examples for ``cwltool:CUDARequirement``, ``ResourceRequirement`` and ``NetworkAccess``. - Improve schema definition of ``ResourceRequirement``. - Deprecate ``DockerGpuRequirement``, with attempts to auto-convert it into corresponding ``DockerRequirement`` @@ -27,7 +29,6 @@ Fixes: Use ``packaging.version.Version`` substitute whenever possible, but preserve backward compatibility with ``distutils`` in case of older Python not supporting it. - Fix ``cli._update_files`` so there are no attempts to upload remote references to the `Vault`. -- No change. .. _changes_4.27.0: diff --git a/tests/functional/application-packages/ReadFile/deploy.yml b/tests/functional/application-packages/ReadFile/deploy.yml index b5828ed8e..05d4703fb 100644 --- a/tests/functional/application-packages/ReadFile/deploy.yml +++ b/tests/functional/application-packages/ReadFile/deploy.yml @@ -19,5 +19,5 @@ outputTransmission: - reference executionUnit: # note: This does not work by itself! The test suite injects the file dynamically. - - href: "tests/functional/application-packages/CatValue/cat.cwl" + - href: "tests/functional/application-packages/ReadValue/package.cwl" deploymentProfileName: "http://www.opengis.net/profiles/eoc/dockerizedApplication" diff --git a/tests/functional/application-packages/SimulateResourceUsage/deploy.yml b/tests/functional/application-packages/SimulateResourceUsage/deploy.yml new file mode 100644 index 000000000..f3ffd0d5c --- /dev/null +++ b/tests/functional/application-packages/SimulateResourceUsage/deploy.yml @@ -0,0 +1,15 @@ +# YAML representation supported by WeaverClient +processDescription: + id: SimulateResourceUsage + title: Gradually allocate RAM to simulate a process load. + version: "1.0" + keywords: + - test +jobControlOptions: + - async-execute +outputTransmission: + - reference +executionUnit: + # note: This does not work by itself! The test suite injects the file dynamically. + - href: "tests/functional/application-packages/SimulateResourceUsage/package.cwl" +deploymentProfileName: "http://www.opengis.net/profiles/eoc/dockerizedApplication" diff --git a/tests/functional/application-packages/SimulateResourceUsage/job.yml b/tests/functional/application-packages/SimulateResourceUsage/job.yml new file mode 100644 index 000000000..4e1653283 --- /dev/null +++ b/tests/functional/application-packages/SimulateResourceUsage/job.yml @@ -0,0 +1,4 @@ +ram_amount: 4 +ram_chunks: 16 +time_duration: 1 +time_interval: 0.25 diff --git a/tests/functional/application-packages/SimulateResourceUsage/package.cwl b/tests/functional/application-packages/SimulateResourceUsage/package.cwl new file mode 100644 index 000000000..29339c0af --- /dev/null +++ b/tests/functional/application-packages/SimulateResourceUsage/package.cwl @@ -0,0 +1,45 @@ +#!/usr/bin/env cwl-runner +# WARNING: +# This process can generate a large memory load and a very large output file that captures the generated data. +# Even with default ResourceRequirement values, the output can become substantial rapidly. +cwlVersion: "v1.0" +class: CommandLineTool +baseCommand: + - bash + - script.sh +requirements: + DockerRequirement: + dockerPull: debian:stretch-slim + InitialWorkDirRequirement: + listing: + # below script is generated dynamically in the working directory, and then called by the base command + # reference: https://unix.stackexchange.com/a/254976/288952 + - entryname: script.sh + entry: | + echo "Will allocate RAM chunks of $(inputs.ram_chunks) MiB." + echo "Will allocate RAM chunks in increments up to $(inputs.ram_amount) times." + echo "Will maintain allocated RAM load for $(inputs.time_duration)s for each increment." + echo "Will wait $(inputs.time_interval)s between each allocation." + echo "Begin allocating memory..." + for index in \$(seq $(inputs.ram_amount)); do + echo "Allocating \$index x $(inputs.ram_chunks) MiB for $(inputs.time_duration)s..." + cat <( None + """ + Test that :data:`CWL_REQUIREMENT_RESOURCE` are considered for :term:`Process` execution. + + .. note:: + This test also conveniently serves for testing how large :term:`Job` logs are handled by the storage. + Because of the large output produced and captured in the logs, saving them directly to the database + is not supported. The :term:`Job` should therefore filter problematic entries to the log. + """ + proc = "SimulateResourceUsage" + body = self.retrieve_payload(proc, "deploy", local=True) + pkg = self.retrieve_payload(proc, "package", local=True) + pkg["requirements"].update(resource_requirement) + body["executionUnit"] = [{"unit": pkg}] + self.deploy_process(body, describe_schema=ProcessSchema.OGC) + + exec_body = { + "mode": ExecuteMode.ASYNC, + "response": ExecuteResponse.DOCUMENT, + "inputs": { + "ram_chunks": ram_chunks_mb, + "ram_amount": ram_amount_mb, + "time_duration": time_duration_s, + "time_interval": time_interval_s, + }, + "outputs": [{"id": "output", "transmissionMode": ExecuteTransmissionMode.REFERENCE}] + } + out_dir = None + try: + with contextlib.ExitStack() as stack: + for mock_exec in mocked_execute_celery(): + stack.enter_context(mock_exec) + proc_url = f"/processes/{proc}/jobs" + resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5, + data=exec_body, headers=self.json_headers, only_local=True) + assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}" + status_url = resp.json["location"] + job_id = resp.json["jobID"] + wps_dir = get_wps_output_dir(self.settings) + out_dir = os.path.join(wps_dir, job_id, "output") + + results = self.monitor_job(status_url, expect_failed=expect_fail) + assert "output" in results + out_log = os.path.join(out_dir, "stdout.log") + assert os.path.isfile(out_log) + assert os.stat(out_log).st_size >= expect_size_min_mb * 2**20 + with open(out_log, mode="r", encoding="utf-8") as out_file: + output = (line for line in out_file.readlines() if line[0] != "\0") + output = list(output) + assert all( + any(f"Allocating {i} x {ram_chunks_mb} MiB" in line for line in output) + for i in range(1, ram_amount_mb + 1) + ) + + log_url = f"{status_url}/logs" + log_resp = mocked_sub_requests(self.app, "get", log_url, timeout=5, + headers=self.json_headers, only_local=True) + job_logs = log_resp.json + assert all( + any(f"Allocating {i} x {ram_chunks_mb} MiB" in line for line in job_logs) + for i in range(1, ram_amount_mb + 1) + ) + assert all( + any( + f"" + in line for line in job_logs + ) + for i in range(1, ram_amount_mb + 1) + ) + + stat_url = f"{status_url}/statistics" + stat_resp = mocked_sub_requests(self.app, "get", stat_url, timeout=5, + headers=self.json_headers, only_local=True) + job_stats = stat_resp.json + assert all( + job_stats["process"][mem] > expect_ram_min_mb + for mem in ["rssBytes", "ussBytes", "vmsBytes"] + ) + finally: + if out_dir: + shutil.rmtree(out_dir, ignore_errors=True) + # FIXME: create a real async test (threading/multiprocess) to evaluate this correctly def test_dismiss_job(self): """ diff --git a/weaver/datatype.py b/weaver/datatype.py index a523f660f..459e3dbe1 100644 --- a/weaver/datatype.py +++ b/weaver/datatype.py @@ -44,6 +44,7 @@ from weaver.utils import localize_datetime # for backward compatibility of previously saved jobs not time-locale-aware from weaver.utils import ( VersionFormat, + apply_number_with_unit, as_version_major_minor_patch, extend_instance, fully_qualified_name, @@ -378,7 +379,7 @@ def wps(self, container=None, **kwargs): try: _wps = self.get("_wps") if _wps is None: - # client retrieval could also be cached if recently fetched an not yet invalidated + # client retrieval could also be cached if recently fetched and not yet invalidated self["_wps"] = _wps = get_wps_client(self.url, container=container, **kwargs) return _wps except (OWSServiceException, xml_util.ParseError) as exc: @@ -579,7 +580,7 @@ def check_accessible(self, settings, ignore=True): meth = "HEAD" url = self.url # - allow 500 for services that incorrectly handle invalid request params, but at least respond - # (should be acceptable in this case because the 'ping' request is not necessarily well formed) + # (should be acceptable in this case because the 'ping' request is not necessarily well-formed) # - allow 400/405 for bad request/method directly reported by the service for the same reasons # - enforce quick timeout (but don't allow 408 code) to avoid long pending connexions that never resolve allowed_codes = [200, 400, 405, 500] @@ -620,27 +621,38 @@ def __init__(self, *args, **kwargs): if not isinstance(self.id, (str, uuid.UUID)): raise TypeError(f"Type 'str' or 'UUID' is required for '{self.__name__}.id'") - def _get_log_msg(self, msg=None, status=None, progress=None): - # type: (Optional[str], Optional[AnyStatusType], Optional[Number]) -> str - if not msg: - msg = self.status_message + @staticmethod + def _get_message(message, size_limit=None): + # type: (str, Optional[int]) -> str + msg_len = len(message) + size_limit = size_limit if isinstance(size_limit, int) and size_limit > 0 else 1024**2 + if len(message) > size_limit: + msg_size = apply_number_with_unit(msg_len, binary=True, decimals=2) + return f"" + return message + + def _get_log_msg(self, msg=None, status=None, progress=None, size_limit=None): + # type: (Optional[str], Optional[AnyStatusType], Optional[Number], Optional[int]) -> str + msg = self._get_message(msg or self.status_message, size_limit=size_limit) status = map_status(status or self.status) progress = max(0, min(100, progress or self.progress)) return get_job_log_msg(duration=self.duration_str, progress=progress, status=status, message=msg) @staticmethod - def _get_err_msg(error): - # type: (WPSException) -> str - return f"{error.text} - code={error.code} - locator={error.locator}" + def _get_err_msg(error, size_limit=None): + # type: (WPSException, Optional[int]) -> str + error_msg = Job._get_message(error.text, size_limit=size_limit) + return f"{error_msg} - code={error.code} - locator={error.locator}" def save_log(self, - errors=None, # type: Optional[Union[str, Exception, WPSException, List[WPSException]]] - logger=None, # type: Optional[Logger] - message=None, # type: Optional[str] - level=INFO, # type: AnyLogLevel - status=None, # type: Optional[AnyStatusType] - progress=None, # type: Optional[Number] - ): # type: (...) -> None + errors=None, # type: Optional[Union[str, Exception, WPSException, List[WPSException]]] + logger=None, # type: Optional[Logger] + message=None, # type: Optional[str] + level=INFO, # type: AnyLogLevel + status=None, # type: Optional[AnyStatusType] + progress=None, # type: Optional[Number] + size_limit=None, # type: Optional[int] + ): # type: (...) -> None """ Logs the specified error and/or message, and adds the log entry to the complete job log. @@ -661,6 +673,10 @@ def save_log(self, :param progress: Override progress applied in the logged message entry, but does not set it to the job object. Uses the current :attr:`Job.progress` value if not specified. + :param size_limit: + Log message entries that individually exceed the limit will be clipped with a generic message. + The parameter is provided for convenience, but take note that setting a too large value could cause the + complete :term:`Job` to fail saving to the database if its total size exceeds the document limit. .. note:: The job object is updated with the log but still requires to be pushed to database to actually persist it. @@ -668,22 +684,30 @@ def save_log(self, if isinstance(errors, WPSException): errors = [errors] elif isinstance(errors, Exception): - errors = str(errors) + errors = self._get_message(str(errors), size_limit=size_limit) if isinstance(errors, str): - log_msg = [(ERROR, self._get_log_msg(message, status=status, progress=progress))] + log_msg = [(ERROR, self._get_log_msg(message, status=status, progress=progress, size_limit=size_limit))] self.exceptions.append(errors) elif isinstance(errors, list): log_msg = [ - (ERROR, self._get_log_msg(self._get_err_msg(error), status=status, progress=progress)) + ( + ERROR, + self._get_log_msg( + self._get_err_msg(error, size_limit=size_limit), + status=status, + progress=progress, + size_limit=size_limit, + ) + ) for error in errors ] self.exceptions.extend([{ "Code": error.code, "Locator": error.locator, - "Text": error.text + "Text": self._get_message(error.text, size_limit=size_limit), } for error in errors]) else: - log_msg = [(level, self._get_log_msg(message, status=status, progress=progress))] + log_msg = [(level, self._get_log_msg(message, status=status, progress=progress, size_limit=size_limit))] for lvl, msg in log_msg: fmt_msg = get_log_fmt() % dict(asctime=now().strftime(get_log_date_fmt()), levelname=getLevelName(lvl), @@ -1574,7 +1598,7 @@ def image(self): def registry(self): # type: () -> str """ - Obtains the registry entry that must used for ``docker login {registry}``. + Obtains the registry entry that must be used for ``docker login {registry}``. """ return dict.__getitem__(self, "registry") @@ -1847,7 +1871,7 @@ def name(self): def tag(self): # type: () -> str """ - Full identifier including the version for an unique reference. + Full identifier including the version for a unique reference. """ proc_id = self.split_version(self.id)[0] # bw-compat, if no version available, no update was applied (single deploy) @@ -2648,7 +2672,7 @@ def status(self, status): (value == QuoteStatus.SUBMITTED and prev != QuoteStatus.SUBMITTED) or (value == QuoteStatus.PROCESSING and prev == QuoteStatus.COMPLETED) ): - LOGGER.error("Cannot revert back to previous quote status (%s => %s)", value, self.status) + LOGGER.error("Cannot revert to previous quote status (%s => %s)", value, self.status) LOGGER.debug(traceback.extract_stack()) return self["status"] = value diff --git a/weaver/processes/wps_package.py b/weaver/processes/wps_package.py index 33f128b26..b9c6b72de 100644 --- a/weaver/processes/wps_package.py +++ b/weaver/processes/wps_package.py @@ -1440,9 +1440,10 @@ def setup_runtime(self): # when process is a docker image, memory monitoring information is obtained with CID file # this file is only generated when the below command is explicitly None (not even when '') "user_space_docker_cmd": None, - # if 'ResourceRequirement' is specified to limit RAM usage, below must be added to ensure it is applied + # if 'ResourceRequirement' is specified to limit RAM/CPU usage, below must be added to ensure it is applied # but don't enable it otherwise, since some defaults are applied which could break existing processes "strict_memory_limit": bool(res_req), + "strict_cpu_limit": bool(res_req), } return runtime_params diff --git a/weaver/store/mongodb.py b/weaver/store/mongodb.py index 4f7f22278..addf6a980 100644 --- a/weaver/store/mongodb.py +++ b/weaver/store/mongodb.py @@ -1144,7 +1144,7 @@ def _apply_duration_filter(pipeline, min_duration, max_duration): # duration is not directly stored in the database (as it can change), it must be computed inplace duration_field = { "$addFields": { - "duration": { # becomes 'null' if cannot be computed (e.g.: not started) + "duration": { # becomes 'null' if it cannot be computed (e.g.: not started) "$dateDiff": { # compute the same way as Job.duration "startDate": "$started",