Skip to content

Commit

Permalink
QE-10885 Refactor multiprocessing pool (#318)
Browse files Browse the repository at this point in the history
The way cucu does multiprocessing has bugs. This PR refactored the logic
related to multiprocessing. It also adds a unit test and a functional
test to verily the change.
  • Loading branch information
ddl-xin authored Mar 9, 2023
1 parent ceb89cd commit ff55cbd
Show file tree
Hide file tree
Showing 7 changed files with 380 additions and 240 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ commands:
set +eo pipefail
poetry run coverage combine .coverage.*
poetry run coverage html
poetry run coverage report --fail-under=85
poetry run coverage report --fail-under=70
executors:
unit-test-executor:
Expand Down
8 changes: 8 additions & 0 deletions features/cli/run.feature
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,11 @@ Feature: Run
"""
runtime timeout reached, aborting run
"""
@runtime-timeout @workers
Scenario: User can run with a runtime timeout and workers to avoid running over a certain amount of time
Given I run the command "cucu run data/features/slow_features --workers 2 --runtime-timeout 10 --results {CUCU_RESULTS_DIR}/runtime_timeout_with_workers_timed_out_results" and save stdout to "STDOUT" and expect exit code "1"
Then I should see the previous step took less than "11" seconds
And I should see "{STDOUT}" contains the following:
"""
runtime timeout reached, aborting run
"""
440 changes: 254 additions & 186 deletions poetry.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cucu"
version = "0.122.0"
version = "0.123.0"
license = "MIT"
description = ""
authors = ["Rodney Gomes <[email protected]>"]
Expand Down Expand Up @@ -32,15 +32,15 @@ beautifulsoup4 = "^4.11.1"
ansi2html = "^1.7.0"
jellyfish = "^0.9.0"

[tool.poetry.dev-dependencies]
pytest = "^5.4.3"
[tool.poetry.scripts]
cucu = "cucu.cli:main"

[tool.poetry.group.dev.dependencies]
pytest = "^7.2.2"
nox = "^2020.5.24"
flake8 = "^4.0.1"
ipdb = "^0.13.9"

[tool.poetry.scripts]
cucu = "cucu.cli:main"

[build-system]
requires = ["poetry-core>=1.3.2"]
build-backend = "poetry.core.masonry.api"
117 changes: 70 additions & 47 deletions src/cucu/cli/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os

from click import ClickException
from collections import namedtuple
from cucu import (
fuzzy,
init_global_hook_variables,
Expand Down Expand Up @@ -325,68 +326,90 @@ def cancel_timer(_):
feature_filepaths = [filepath]

with multiprocessing.Pool(int(workers)) as pool:
# Each feature file is applied to the pool as an async task.
# It then polls the async result of each task. It the result
# is ready, it removes the result from the list of results that
# need to be checked again until all the results are checked.
# If the timer is triggered, it stops the while loop and
# logs all the unfinished features.
# The pool is terminated automatically when it exits the
# context.
timer = None
timeout_reached = False
if runtime_timeout:
logger.debug("setting up runtime timeout timer")

def runtime_exit():
nonlocal timeout_reached
logger.error("runtime timeout reached, aborting run")
timeout_filepath = os.path.join(
results, "runtime-timeout"
)
open(timeout_filepath, "w").close()

for child in multiprocessing.active_children():
os.kill(child.pid, signal.SIGINT)
os.kill(child.pid, signal.SIGTERM)
timeout_reached = True

timer = Timer(runtime_timeout, runtime_exit)
timer.start()

def cancel_timer(_):
logger.debug("cancelled runtime timeout timer")
timer.cancel()

register_after_all_hook(cancel_timer)

AsyncResult = namedtuple("AsyncResult", ["feature", "result"])
async_results = []
for feature_filepath in feature_filepaths:
async_results.append(
pool.apply_async(
behave,
[
feature_filepath,
color_output,
dry_run,
env,
fail_fast,
headless,
name,
ipdb_on_failure,
junit,
results,
secrets,
show_skips,
tags,
verbose,
],
{
"redirect_output": True,
},
)
result = pool.apply_async(
behave,
[
feature_filepath,
color_output,
dry_run,
env,
fail_fast,
headless,
name,
ipdb_on_failure,
junit,
results,
secrets,
show_skips,
tags,
verbose,
],
{
"redirect_output": True,
},
)
async_results.append(AsyncResult(feature_filepath, result))

workers_failed = False
for result in async_results:
exit_code = result.get(runtime_timeout)
if exit_code != 0:
workers_failed = True

if timer:
timer.cancel()

pool.close()
pool.join()
while not timeout_reached:
remaining = []
for feature_result in async_results:
feature, result = feature_result
if result.ready():
try:
exit_code = result.get()
if exit_code != 0:
workers_failed = True
except Exception:
logger.exception(
f"an exception is raised during feature {feature}"
)
workers_failed = True
else:
remaining.append(feature_result)

if len(remaining) == 0:
if timer:
timer.cancel()
break

async_results = remaining
time.sleep(1)
else:
logger.error(
"features not finished:\n"
"\n".join(
[
" " + feature_result.feature
for feature_result in async_results
]
),
)
workers_failed = True

if workers_failed:
raise RuntimeError(
Expand Down
11 changes: 11 additions & 0 deletions src/cucu/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,14 @@ def error(*args, **kwargs):
CONFIG["__CUCU_WROTE_TO_OUTPUT"] = True

logging.getLogger().error(*args, **kwargs)


@wraps(logging.exception)
def exception(*args, **kwargs):
console_handler = logging.getLogger().handlers[0]
logging_level = console_handler.level

if logging_level <= logging.ERROR:
CONFIG["__CUCU_WROTE_TO_OUTPUT"] = True

logging.getLogger().exception(*args, **kwargs)
30 changes: 30 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import time

from click.testing import CliRunner
from cucu.cli import core
from multiprocessing import Process
from unittest import mock


RUNTIME_TIMEOUT = 10


def process_func_to_run_cucu(workers, runtime_timeout):
isdir_mock = mock.MagicMock(return_value=False)
with mock.patch("os.path.isdir", isdir_mock):
behave_mock = mock.MagicMock(side_effect=RuntimeError("something bad"))
with mock.patch("cucu.cli.core.behave", behave_mock):
runner = CliRunner()
runner.invoke(
core.run,
f"--runtime-timeout {runtime_timeout} --workers={workers} abc",
)


def test_timeout_with_behave_exception_in_workers():
start = time.time()
p = Process(target=process_func_to_run_cucu, args=(2, RUNTIME_TIMEOUT))
p.start()
p.join()
elapsed_time = time.time() - start
assert elapsed_time < 5

0 comments on commit ff55cbd

Please sign in to comment.