Skip to content

Commit

Permalink
add ResourceRequirements tests [partial] (relates to #138) + ensure b…
Browse files Browse the repository at this point in the history
…ig job log messages are clipped
  • Loading branch information
fmigneault committed Nov 29, 2022
1 parent 4d3ad62 commit eaa129b
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 29 deletions.
3 changes: 2 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Changes

Changes:
--------
- Add `Job` log message size checks to better control what gets logged during the `Application Package` execution to
avoid large documents causing problems when attempting save them to storage database.
- Update documentation with examples for ``cwltool:CUDARequirement``, ``ResourceRequirement`` and ``NetworkAccess``.
- Improve schema definition of ``ResourceRequirement``.
- Deprecate ``DockerGpuRequirement``, with attempts to auto-convert it into corresponding ``DockerRequirement``
Expand All @@ -27,7 +29,6 @@ Fixes:
Use ``packaging.version.Version`` substitute whenever possible, but preserve backward
compatibility with ``distutils`` in case of older Python not supporting it.
- Fix ``cli._update_files`` so there are no attempts to upload remote references to the `Vault`.
- No change.

.. _changes_4.27.0:

Expand Down
2 changes: 1 addition & 1 deletion tests/functional/application-packages/ReadFile/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ outputTransmission:
- reference
executionUnit:
# note: This does not work by itself! The test suite injects the file dynamically.
- href: "tests/functional/application-packages/CatValue/cat.cwl"
- href: "tests/functional/application-packages/ReadValue/package.cwl"
deploymentProfileName: "http://www.opengis.net/profiles/eoc/dockerizedApplication"
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# YAML representation supported by WeaverClient
processDescription:
id: SimulateResourceUsage
title: Gradually allocate RAM to simulate a process load.
version: "1.0"
keywords:
- test
jobControlOptions:
- async-execute
outputTransmission:
- reference
executionUnit:
# note: This does not work by itself! The test suite injects the file dynamically.
- href: "tests/functional/application-packages/SimulateResourceUsage/package.cwl"
deploymentProfileName: "http://www.opengis.net/profiles/eoc/dockerizedApplication"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ram_amount: 4
ram_chunks: 16
time_duration: 1
time_interval: 0.25
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env cwl-runner
# WARNING:
# This process can generate a large memory load and a very large output file that captures the generated data.
# Even with default ResourceRequirement values, the output can become substantial rapidly.
cwlVersion: "v1.0"
class: CommandLineTool
baseCommand:
- bash
- script.sh
requirements:
DockerRequirement:
dockerPull: debian:stretch-slim
InitialWorkDirRequirement:
listing:
# below script is generated dynamically in the working directory, and then called by the base command
# reference: https://unix.stackexchange.com/a/254976/288952
- entryname: script.sh
entry: |
echo "Will allocate RAM chunks of $(inputs.ram_chunks) MiB."
echo "Will allocate RAM chunks in increments up to $(inputs.ram_amount) times."
echo "Will maintain allocated RAM load for $(inputs.time_duration)s for each increment."
echo "Will wait $(inputs.time_interval)s between each allocation."
echo "Begin allocating memory..."
for index in \$(seq $(inputs.ram_amount)); do
echo "Allocating \$index x $(inputs.ram_chunks) MiB for $(inputs.time_duration)s..."
cat <( </dev/zero head -c \$((\$index * $(inputs.ram_chunks)))m) <(sleep $(inputs.time_duration)) | tail
echo "Waiting for $(inputs.time_interval)s..."
sleep $(inputs.time_interval)
done
echo "Finished allocating memory..."
inputs:
ram_chunks:
type: int
ram_amount:
type: int
time_duration:
type: float
time_interval:
type: float
outputs:
output:
type: File
outputBinding:
glob: "stdout.log"
stdout: stdout.log
104 changes: 103 additions & 1 deletion tests/functional/test_wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import json
import logging
import os
import shutil
import tempfile
from copy import deepcopy
from inspect import cleandoc
Expand Down Expand Up @@ -56,6 +57,7 @@
CWL_REQUIREMENT_APP_DOCKER,
CWL_REQUIREMENT_INIT_WORKDIR,
CWL_REQUIREMENT_INLINE_JAVASCRIPT,
CWL_REQUIREMENT_RESOURCE,
ProcessSchema
)
from weaver.processes.types import ProcessType
Expand All @@ -66,7 +68,7 @@
if TYPE_CHECKING:
from typing import List

from weaver.typedefs import JSON, CWL_AnyRequirements
from weaver.typedefs import JSON, CWL_AnyRequirements, CWL_RequirementsDict, Number

EDAM_PLAIN = EDAM_NAMESPACE + ":" + EDAM_MAPPING[ContentType.TEXT_PLAIN]
OGC_NETCDF = OGC_NAMESPACE + ":" + OGC_MAPPING[ContentType.APP_NETCDF]
Expand Down Expand Up @@ -2500,6 +2502,106 @@ def test_execute_with_directory_output(self):
output_dir_files = {os.path.join(root, file) for root, _, files in os.walk(out_dir) for file in files}
assert output_dir_files == expect_out_files

@parameterized.expand([
# all values in MiB / seconds accordingly
(False, 48, 96, 16, 3, 0.25, 0.25, {}),
(False, 48, 36, 4, 4, 0.25, 0.25, {CWL_REQUIREMENT_RESOURCE: {"ramMax": 52}}),
# FIXME: ensure ResourceRequirements are effective (https://github.com/crim-ca/weaver/issues/138)
# (True, 48, 96, 4, 2, 0.25, 0.25, {CWL_REQUIREMENT_RESOURCE: {"ramMax": 2}}), # FIXME: hangs forever
# (True, 48, 96, 4, 2, 0.25, 0.25, {CWL_REQUIREMENT_RESOURCE: {"outdirMax": 2}}), # FIXME: not failing
(True, 48, 96, 4, 2, 0.25, 0.25, {CWL_REQUIREMENT_RESOURCE: {"outdirMax": 16}}),
])
def test_execute_with_resource_requirement(self,
expect_fail, # type: bool
expect_ram_min_mb, # type: int
expect_size_min_mb, # type: int
ram_chunks_mb, # type: int
ram_amount_mb, # type: int
time_duration_s, # type: Number
time_interval_s, # type: Number
resource_requirement, # type: CWL_RequirementsDict
): # type: (...) -> None
"""
Test that :data:`CWL_REQUIREMENT_RESOURCE` are considered for :term:`Process` execution.
.. note::
This test also conveniently serves for testing how large :term:`Job` logs are handled by the storage.
Because of the large output produced and captured in the logs, saving them directly to the database
is not supported. The :term:`Job` should therefore filter problematic entries to the log.
"""
proc = "SimulateResourceUsage"
body = self.retrieve_payload(proc, "deploy", local=True)
pkg = self.retrieve_payload(proc, "package", local=True)
pkg["requirements"].update(resource_requirement)
body["executionUnit"] = [{"unit": pkg}]
self.deploy_process(body, describe_schema=ProcessSchema.OGC)

exec_body = {
"mode": ExecuteMode.ASYNC,
"response": ExecuteResponse.DOCUMENT,
"inputs": {
"ram_chunks": ram_chunks_mb,
"ram_amount": ram_amount_mb,
"time_duration": time_duration_s,
"time_interval": time_interval_s,
},
"outputs": [{"id": "output", "transmissionMode": ExecuteTransmissionMode.REFERENCE}]
}
out_dir = None
try:
with contextlib.ExitStack() as stack:
for mock_exec in mocked_execute_celery():
stack.enter_context(mock_exec)
proc_url = f"/processes/{proc}/jobs"
resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5,
data=exec_body, headers=self.json_headers, only_local=True)
assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}"
status_url = resp.json["location"]
job_id = resp.json["jobID"]
wps_dir = get_wps_output_dir(self.settings)
out_dir = os.path.join(wps_dir, job_id, "output")

results = self.monitor_job(status_url, expect_failed=expect_fail)
assert "output" in results
out_log = os.path.join(out_dir, "stdout.log")
assert os.path.isfile(out_log)
assert os.stat(out_log).st_size >= expect_size_min_mb * 2**20
with open(out_log, mode="r", encoding="utf-8") as out_file:
output = (line for line in out_file.readlines() if line[0] != "\0")
output = list(output)
assert all(
any(f"Allocating {i} x {ram_chunks_mb} MiB" in line for line in output)
for i in range(1, ram_amount_mb + 1)
)

log_url = f"{status_url}/logs"
log_resp = mocked_sub_requests(self.app, "get", log_url, timeout=5,
headers=self.json_headers, only_local=True)
job_logs = log_resp.json
assert all(
any(f"Allocating {i} x {ram_chunks_mb} MiB" in line for line in job_logs)
for i in range(1, ram_amount_mb + 1)
)
assert all(
any(
f"<message clipped due to large dimension ({i * ram_chunks_mb:.2f} MiB)>"
in line for line in job_logs
)
for i in range(1, ram_amount_mb + 1)
)

stat_url = f"{status_url}/statistics"
stat_resp = mocked_sub_requests(self.app, "get", stat_url, timeout=5,
headers=self.json_headers, only_local=True)
job_stats = stat_resp.json
assert all(
job_stats["process"][mem] > expect_ram_min_mb
for mem in ["rssBytes", "ussBytes", "vmsBytes"]
)
finally:
if out_dir:
shutil.rmtree(out_dir, ignore_errors=True)

# FIXME: create a real async test (threading/multiprocess) to evaluate this correctly
def test_dismiss_job(self):
"""
Expand Down
72 changes: 48 additions & 24 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from weaver.utils import localize_datetime # for backward compatibility of previously saved jobs not time-locale-aware
from weaver.utils import (
VersionFormat,
apply_number_with_unit,
as_version_major_minor_patch,
extend_instance,
fully_qualified_name,
Expand Down Expand Up @@ -378,7 +379,7 @@ def wps(self, container=None, **kwargs):
try:
_wps = self.get("_wps")
if _wps is None:
# client retrieval could also be cached if recently fetched an not yet invalidated
# client retrieval could also be cached if recently fetched and not yet invalidated
self["_wps"] = _wps = get_wps_client(self.url, container=container, **kwargs)
return _wps
except (OWSServiceException, xml_util.ParseError) as exc:
Expand Down Expand Up @@ -579,7 +580,7 @@ def check_accessible(self, settings, ignore=True):
meth = "HEAD"
url = self.url
# - allow 500 for services that incorrectly handle invalid request params, but at least respond
# (should be acceptable in this case because the 'ping' request is not necessarily well formed)
# (should be acceptable in this case because the 'ping' request is not necessarily well-formed)
# - allow 400/405 for bad request/method directly reported by the service for the same reasons
# - enforce quick timeout (but don't allow 408 code) to avoid long pending connexions that never resolve
allowed_codes = [200, 400, 405, 500]
Expand Down Expand Up @@ -620,27 +621,38 @@ def __init__(self, *args, **kwargs):
if not isinstance(self.id, (str, uuid.UUID)):
raise TypeError(f"Type 'str' or 'UUID' is required for '{self.__name__}.id'")

def _get_log_msg(self, msg=None, status=None, progress=None):
# type: (Optional[str], Optional[AnyStatusType], Optional[Number]) -> str
if not msg:
msg = self.status_message
@staticmethod
def _get_message(message, size_limit=None):
# type: (str, Optional[int]) -> str
msg_len = len(message)
size_limit = size_limit if isinstance(size_limit, int) and size_limit > 0 else 1024**2
if len(message) > size_limit:
msg_size = apply_number_with_unit(msg_len, binary=True, decimals=2)
return f"<message clipped due to large dimension ({msg_size})>"
return message

def _get_log_msg(self, msg=None, status=None, progress=None, size_limit=None):
# type: (Optional[str], Optional[AnyStatusType], Optional[Number], Optional[int]) -> str
msg = self._get_message(msg or self.status_message, size_limit=size_limit)
status = map_status(status or self.status)
progress = max(0, min(100, progress or self.progress))
return get_job_log_msg(duration=self.duration_str, progress=progress, status=status, message=msg)

@staticmethod
def _get_err_msg(error):
# type: (WPSException) -> str
return f"{error.text} - code={error.code} - locator={error.locator}"
def _get_err_msg(error, size_limit=None):
# type: (WPSException, Optional[int]) -> str
error_msg = Job._get_message(error.text, size_limit=size_limit)
return f"{error_msg} - code={error.code} - locator={error.locator}"

def save_log(self,
errors=None, # type: Optional[Union[str, Exception, WPSException, List[WPSException]]]
logger=None, # type: Optional[Logger]
message=None, # type: Optional[str]
level=INFO, # type: AnyLogLevel
status=None, # type: Optional[AnyStatusType]
progress=None, # type: Optional[Number]
): # type: (...) -> None
errors=None, # type: Optional[Union[str, Exception, WPSException, List[WPSException]]]
logger=None, # type: Optional[Logger]
message=None, # type: Optional[str]
level=INFO, # type: AnyLogLevel
status=None, # type: Optional[AnyStatusType]
progress=None, # type: Optional[Number]
size_limit=None, # type: Optional[int]
): # type: (...) -> None
"""
Logs the specified error and/or message, and adds the log entry to the complete job log.
Expand All @@ -661,29 +673,41 @@ def save_log(self,
:param progress:
Override progress applied in the logged message entry, but does not set it to the job object.
Uses the current :attr:`Job.progress` value if not specified.
:param size_limit:
Log message entries that individually exceed the limit will be clipped with a generic message.
The parameter is provided for convenience, but take note that setting a too large value could cause the
complete :term:`Job` to fail saving to the database if its total size exceeds the document limit.
.. note::
The job object is updated with the log but still requires to be pushed to database to actually persist it.
"""
if isinstance(errors, WPSException):
errors = [errors]
elif isinstance(errors, Exception):
errors = str(errors)
errors = self._get_message(str(errors), size_limit=size_limit)
if isinstance(errors, str):
log_msg = [(ERROR, self._get_log_msg(message, status=status, progress=progress))]
log_msg = [(ERROR, self._get_log_msg(message, status=status, progress=progress, size_limit=size_limit))]
self.exceptions.append(errors)
elif isinstance(errors, list):
log_msg = [
(ERROR, self._get_log_msg(self._get_err_msg(error), status=status, progress=progress))
(
ERROR,
self._get_log_msg(
self._get_err_msg(error, size_limit=size_limit),
status=status,
progress=progress,
size_limit=size_limit,
)
)
for error in errors
]
self.exceptions.extend([{
"Code": error.code,
"Locator": error.locator,
"Text": error.text
"Text": self._get_message(error.text, size_limit=size_limit),
} for error in errors])
else:
log_msg = [(level, self._get_log_msg(message, status=status, progress=progress))]
log_msg = [(level, self._get_log_msg(message, status=status, progress=progress, size_limit=size_limit))]
for lvl, msg in log_msg:
fmt_msg = get_log_fmt() % dict(asctime=now().strftime(get_log_date_fmt()),
levelname=getLevelName(lvl),
Expand Down Expand Up @@ -1574,7 +1598,7 @@ def image(self):
def registry(self):
# type: () -> str
"""
Obtains the registry entry that must used for ``docker login {registry}``.
Obtains the registry entry that must be used for ``docker login {registry}``.
"""
return dict.__getitem__(self, "registry")

Expand Down Expand Up @@ -1847,7 +1871,7 @@ def name(self):
def tag(self):
# type: () -> str
"""
Full identifier including the version for an unique reference.
Full identifier including the version for a unique reference.
"""
proc_id = self.split_version(self.id)[0]
# bw-compat, if no version available, no update was applied (single deploy)
Expand Down Expand Up @@ -2648,7 +2672,7 @@ def status(self, status):
(value == QuoteStatus.SUBMITTED and prev != QuoteStatus.SUBMITTED) or
(value == QuoteStatus.PROCESSING and prev == QuoteStatus.COMPLETED)
):
LOGGER.error("Cannot revert back to previous quote status (%s => %s)", value, self.status)
LOGGER.error("Cannot revert to previous quote status (%s => %s)", value, self.status)
LOGGER.debug(traceback.extract_stack())
return
self["status"] = value
Expand Down
3 changes: 2 additions & 1 deletion weaver/processes/wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -1440,9 +1440,10 @@ def setup_runtime(self):
# when process is a docker image, memory monitoring information is obtained with CID file
# this file is only generated when the below command is explicitly None (not even when '')
"user_space_docker_cmd": None,
# if 'ResourceRequirement' is specified to limit RAM usage, below must be added to ensure it is applied
# if 'ResourceRequirement' is specified to limit RAM/CPU usage, below must be added to ensure it is applied
# but don't enable it otherwise, since some defaults are applied which could break existing processes
"strict_memory_limit": bool(res_req),
"strict_cpu_limit": bool(res_req),
}
return runtime_params

Expand Down
2 changes: 1 addition & 1 deletion weaver/store/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ def _apply_duration_filter(pipeline, min_duration, max_duration):
# duration is not directly stored in the database (as it can change), it must be computed inplace
duration_field = {
"$addFields": {
"duration": { # becomes 'null' if cannot be computed (e.g.: not started)
"duration": { # becomes 'null' if it cannot be computed (e.g.: not started)
"$dateDiff": {
# compute the same way as Job.duration
"startDate": "$started",
Expand Down

0 comments on commit eaa129b

Please sign in to comment.