From 892326b60c96873242cabcb6daee4e9633d21c96 Mon Sep 17 00:00:00 2001 From: Carl Baillargeon Date: Thu, 16 May 2024 17:40:28 -0400 Subject: [PATCH] Added max-concurrency CLI option --- anta/cli/nrfu/__init__.py | 10 ++++++++++ anta/result_manager/models.py | 4 ++++ anta/runner.py | 27 ++++++++++++++++++--------- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/anta/cli/nrfu/__init__.py b/anta/cli/nrfu/__init__.py index 6a67e609f..6a44ffd33 100644 --- a/anta/cli/nrfu/__init__.py +++ b/anta/cli/nrfu/__init__.py @@ -107,12 +107,21 @@ def parse_args(self, ctx: click.Context, args: list[str]) -> list[str]: is_flag=True, default=False, ) +@click.option( + "--max-concurrency", + help="Maximum number of tests to run concurrently.", + type=int, + show_envvar=True, + default=50000, + show_default=True, +) # pylint: disable=too-many-arguments def nrfu( ctx: click.Context, inventory: AntaInventory, tags: set[str] | None, catalog: AntaCatalog, + max_concurrency: int, device: tuple[str], test: tuple[str], hide: tuple[str], @@ -138,6 +147,7 @@ def nrfu( ctx.obj["result_manager"], inventory, catalog, + max_concurrency=max_concurrency, tags=tags, devices=set(device) if device else None, tests=set(test) if test else None, diff --git a/anta/result_manager/models.py b/anta/result_manager/models.py index c53947ee4..7e98c01a8 100644 --- a/anta/result_manager/models.py +++ b/anta/result_manager/models.py @@ -5,6 +5,8 @@ from __future__ import annotations +from typing import Any + from pydantic import BaseModel from anta.custom_types import TestStatus @@ -32,6 +34,8 @@ class TestResult(BaseModel): result: TestStatus = "unset" messages: list[str] = [] custom_field: str | None = None + json_output: dict[str, Any] | None = None + text_output: str | None = None def is_success(self, message: str | None = None) -> None: """Set status to success. diff --git a/anta/runner.py b/anta/runner.py index 668353e2f..7110ec023 100644 --- a/anta/runner.py +++ b/anta/runner.py @@ -30,7 +30,6 @@ logger = logging.getLogger(__name__) DEFAULT_NOFILE = 16384 -MAXIMUM_TEST_CONCURRENCY = 20000 def adjust_rlimit_nofile() -> tuple[int, int]: @@ -93,7 +92,8 @@ async def run_tests(tests_generator: AsyncGenerator[Coroutine[Any, Any, TestResu ------ TestResult: The result of each completed test. """ - aws = aiter(tests_generator) + # NOTE: The `aiter` built-in function is not available in Python 3.9 + aws = tests_generator.__aiter__() # pylint: disable=unnecessary-dunder-call aws_ended = False pending: set[Future[TestResult]] = set() @@ -101,8 +101,8 @@ async def run_tests(tests_generator: AsyncGenerator[Coroutine[Any, Any, TestResu # Add tests to the pending set until the limit is reached or no more tests are available while len(pending) < limit and not aws_ended: try: - # Get the next test coroutine from the generator - aw = await anext(aws) + # NOTE: The `anext` built-in function is not available in Python 3.9 + aw = await aws.__anext__() # pylint: disable=unnecessary-dunder-call except StopAsyncIteration: # noqa: PERF203 aws_ended = True logger.debug("All tests have been added to the pending set.") @@ -246,6 +246,7 @@ async def main( # noqa: PLR0913 manager: ResultManager, inventory: AntaInventory, catalog: AntaCatalog, + max_concurrency: int = 50000, devices: set[str] | None = None, tests: set[str] | None = None, tags: set[str] | None = None, @@ -264,6 +265,7 @@ async def main( # noqa: PLR0913 manager: ResultManager object to populate with the test results. inventory: AntaInventory object that includes the device(s). catalog: AntaCatalog object that includes the list of tests. + max_concurrency: Maximum number of tests to run concurrently. Default is 50000. devices: Devices on which to run tests. None means all devices. These may come from the `--device / -d` CLI option in NRFU. tests: Tests to run against devices. None means all tests. These may come from the `--test / -t` CLI option in NRFU. tags: Tags to filter devices from the inventory. These may come from the `--tags` CLI option in NRFU. @@ -289,18 +291,25 @@ async def main( # noqa: PLR0913 return run_info = ( - "--- ANTA NRFU Run Information ---\n" + "------------------------------------ ANTA NRFU Run Information -------------------------------------\n" f"Number of devices: {len(inventory)} ({len(selected_inventory)} established)\n" f"Total number of selected tests: {catalog.final_tests_count}\n" + f"Maximum number of tests to run concurrently: {max_concurrency}\n" f"Maximum number of open file descriptors for the current ANTA process: {limits[0]}\n" - "---------------------------------" + "----------------------------------------------------------------------------------------------------" ) logger.info(run_info) - if catalog.final_tests_count > limits[0]: + if catalog.final_tests_count > max_concurrency: logger.warning( - "The number of concurrent tests is higher than the open file descriptors limit for this ANTA process.\n" + "The total number of tests is higher than the maximum number of tests to run concurrently.\n" + "ANTA will be throttled to run at the maximum number of tests to run concurrently to ensure system stability.\n" + "Please consult the ANTA FAQ." + ) + if max_concurrency > limits[0]: + logger.warning( + "The maximum number of tests to run concurrently is higher than the open file descriptors limit for this ANTA process.\n" "Errors may occur while running the tests.\n" "Please consult the ANTA FAQ." ) @@ -317,7 +326,7 @@ async def main( # noqa: PLR0913 AntaTest.nrfu_task = AntaTest.progress.add_task("Running NRFU Tests...", total=catalog.final_tests_count) with Catchtime(logger=logger, message="Running ANTA tests"): - async for result in run_tests(tests_generator, limit=MAXIMUM_TEST_CONCURRENCY): + async for result in run_tests(tests_generator, limit=max_concurrency): manager.add(result) log_cache_statistics(selected_inventory.devices)