Skip to content

Commit

Permalink
some rework on remote service & remote resource
Browse files Browse the repository at this point in the history
- turn remote service start into steps, thus delay remote driver
  construction on remote host
- fix certain remote cmd gen utils
- explicitly disallow remote resource usage on windows hosts
- fix empty report check in certain exporters
- add cleanup for dangling symlink due to workspace imitation on remote
  host, rearrange some source in remote resource as well
- fix a bug in driver dep graph type check
  • Loading branch information
zhenyu-ms committed Feb 12, 2025
1 parent 2bf9814 commit 050a1eb
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 43 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
"gherkin-official==4.1.3",
"parse",
"orjson; python_version>='3.8'",
"flask-orjson; python_version>='3.8'"
"flask-orjson; python_version>='3.8'",
"exceptiongroup"
]
requires-python = ">=3.7"

Expand Down
34 changes: 26 additions & 8 deletions testplan/common/remote/remote_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,38 @@ class RemoteDriver:

def __init__(self, remote_service, driver_cls, **options):

self.__dict__["rpyc_connection"] = remote_service.rpyc_connection
self.__dict__["_driver_cls"] = getattr(
self.rpyc_connection.modules[driver_cls.__module__],
driver_cls.__name__,
)
self.__dict__["_remote_service"] = remote_service
self.__dict__["_driver_cls"] = driver_cls
self.__dict__["_options"] = options
self.__dict__["_handle"] = None

options["runpath"] = "/".join(
[remote_service._remote_resource_runpath, options["name"]]
def _init_handle(self):
rmt_driver_cls = getattr(
self._remote_service.rpyc_connection.modules[
self._driver_cls.__module__
],
self._driver_cls.__name__,
)
self._options["runpath"] = "/".join(
[
self._remote_service._remote_resource_runpath,
self._options["name"],
]
)
self.__dict__["_handle"] = rmt_driver_cls(**self._options)

self.__dict__["_handle"] = self._driver_cls(**options)
def uid(self) -> str:
# driver uid needed in env constructing check
# whether remote or local, uid should be the same since it's cfg based
# XXX: would it be a bad idea to have it inited locally?
return self._driver_cls(**self._options).uid()

def __getattr__(self, name):
if self._handle is None:
self._init_handle()
return getattr(self._handle, name)

def __setattr__(self, name, value):
if self._handle is None:
self._init_handle()
setattr(self._handle, name, value)
69 changes: 50 additions & 19 deletions testplan/common/remote/remote_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def get_options(cls):
ConfigOption("remote_runpath", default=None): str,
ConfigOption("testplan_path", default=None): str,
ConfigOption("remote_workspace", default=None): str,
# proposing cfg clobber_remote_workspace,
# to overwrite what's in remote_workspace when set to True
ConfigOption("clean_remote", default=False): bool,
ConfigOption("push", default=[]): Or(list, None),
ConfigOption("push_exclude", default=[]): Or(list, None),
Expand Down Expand Up @@ -152,33 +154,46 @@ def __init__(
status_wait_timeout: int = 60,
**options,
) -> None:
if IS_WIN:
raise NotImplementedError(
"RemoteResource not supported on Windows hosts."
)

if not worker_is_remote(remote_host):
# TODO: allow connecting to local for testing purpose?
raise RuntimeError(
"Cannot create remote resource on the same host that Testplan runs."
)
options.update(self.filter_locals(locals()))
super(RemoteResource, self).__init__(**options)

self.ssh_cfg = {
"host": self.cfg.remote_host,
"port": self.cfg.ssh_port,
}
if 0 != self._execute_cmd_remote("uname"):
raise NotImplementedError(
"RemoteResource not supported on Windows hosts."
)

self._remote_plan_runpath = None
self._remote_resource_runpath = None
self._child_paths = _LocationPaths()
self._testplan_import_path = _LocationPaths()
self._workspace_paths = _LocationPaths()
self._working_dirs = _LocationPaths()

self.ssh_cfg = {
"host": self.cfg.remote_host,
"port": self.cfg.ssh_port,
}

self.setup_metadata = WorkerSetupMetadata()
self._user = getpass.getuser()
self.python_binary = (
os.environ["PYTHON3_REMOTE_BINARY"] if IS_WIN else sys.executable
)
# TODO: allow specifying remote env
self.python_binary = sys.executable
self._error_exec = []

# remote file system obj outside runpath that needs to be cleaned upon
# exit when clean_remote is True, otherwise it will break workspace
# detect etc. in next run
self._dangling_remote_fs_obj = None

@property
def error_exec(self) -> list:
return self._error_exec
Expand All @@ -198,10 +213,8 @@ def _prepare_remote(self) -> None:
def _define_remote_dirs(self) -> None:
"""Define mandatory directories in remote host."""

self._remote_plan_runpath = self.cfg.remote_runpath or (
f"/var/tmp/{getpass.getuser()}/testplan/{self._get_plan().cfg.name}"
if IS_WIN
else self._get_plan().runpath
self._remote_plan_runpath = (
self.cfg.remote_runpath or self._get_plan().runpath
)
self._workspace_paths.local = fix_home_prefix(
os.path.abspath(self.cfg.workspace)
Expand Down Expand Up @@ -252,8 +265,6 @@ def _remote_working_dir(self) -> None:
def _create_remote_dirs(self) -> None:
"""Create mandatory directories in remote host."""

exist_on_remote = self._check_workspace()

if 0 != self._execute_cmd_remote(
cmd=filepath_exist_cmd(self._remote_runid_file),
label="runid file availability check",
Expand All @@ -275,9 +286,15 @@ def _create_remote_dirs(self) -> None:
label="create remote runid file",
)

self._prepare_workspace(exist_on_remote)
# TODO: remote venv setup
# NOTE: curr strategy is always do the copy/link, to handle the
# NOTE: case that testplan path under workspace, we deal with it
# NOTE: first
self._copy_testplan_package()

exist_on_remote = self._check_workspace()
self._prepare_workspace(exist_on_remote)

self._execute_cmd_remote(
cmd=mkdir_cmd(self._remote_resource_runpath),
label="create remote resource runpath",
Expand Down Expand Up @@ -382,6 +399,7 @@ def _prepare_workspace(self, exist_on_remote: bool) -> None:
self,
self.ssh_cfg["host"],
)
# proposed: if clobber_remote_workspace set, overwrite remote
self._execute_cmd_remote(
cmd=link_cmd(
path=self._workspace_paths.local,
Expand All @@ -400,7 +418,7 @@ def _prepare_workspace(self, exist_on_remote: bool) -> None:
)
self._transfer_data(
# join with "" to add trailing "/" to source
# this will copy everything under local import path to to testplan_lib
# this will copy everything under local workspace path to fetched_workspace
source=os.path.join(self._workspace_paths.local, ""),
target=self._workspace_paths.remote,
remote_target=True,
Expand All @@ -412,19 +430,21 @@ def _prepare_workspace(self, exist_on_remote: bool) -> None:
self,
self.ssh_cfg["host"],
)
# TODO: possibly dangling dirs never get removed
self._execute_cmd_remote(
cmd=mkdir_cmd(os.path.dirname(self._workspace_paths.local)),
label="imitate local workspace path on remote - mkdir",
check=False, # just best effort
)
self._execute_cmd_remote(
if 0 == self._execute_cmd_remote(
cmd=link_cmd(
path=self._workspace_paths.remote,
link=self._workspace_paths.local,
),
label="imitate local workspace path on remote - ln",
check=False, # just best effort
)
):
self._dangling_remote_fs_obj = self._workspace_paths.local

def _push_files(self) -> None:
"""Push files and directories to remote host."""
Expand Down Expand Up @@ -601,10 +621,21 @@ def _clean_remote(self) -> None:
)

self._execute_cmd_remote(
cmd=f"/bin/rm -rf {self._remote_plan_runpath}",
cmd=rm_cmd(self._remote_plan_runpath),
label="Clean remote root runpath",
)

if self._dangling_remote_fs_obj:
self._execute_cmd_remote(
cmd=rm_cmd(self._dangling_remote_fs_obj),
label=f"Remove imitated workspace outside runpath",
)
self._dangling_remote_fs_obj = None

if self.cfg.delete_pushed:
# TODO
...

def _pull_files(self) -> None:
"""Pull custom files from remote host."""

Expand Down
3 changes: 1 addition & 2 deletions testplan/common/remote/remote_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,8 @@ def stopping(self) -> None:
"""
Stops remote rpyc process.
"""
remote_pid = self.rpyc_connection.modules.os.getpid()
try:
self.rpyc_connection.modules.os.kill(remote_pid, signal.SIGTERM)
self.rpyc_connection.modules.os.kill(self.rpyc_pid, signal.SIGTERM)
except EOFError:
pass

Expand Down
2 changes: 1 addition & 1 deletion testplan/common/utils/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def rebase_path(path, old_base, new_base):
"""

rel_path = os.path.relpath(path, old_base).split(os.sep)
return "/".join([new_base] + rel_path)
return os.path.join(new_base, *rel_path)


def is_subdir(child, parent):
Expand Down
13 changes: 7 additions & 6 deletions testplan/common/utils/remote.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""Remote execution utilities."""

import getpass
import os
import platform
import shlex
import socket
import sys
import getpass
import subprocess
import sys

IS_WIN = platform.system() == "Windows"
USER = getpass.getuser()
Expand Down Expand Up @@ -119,19 +120,19 @@ def copy_cmd(source, target, exclude=None, port=None, deref_links=False):

def link_cmd(path, link):
"""Returns link creation command."""
return " ".join(["ln", "-sfn", path, link])
return " ".join(["/bin/ln", "-sfn", shlex.quote(path), shlex.quote(link)])


def mkdir_cmd(path):
"""Return mkdir command"""
return " ".join(["/bin/mkdir", "-p", path])
return " ".join(["/bin/mkdir", "-p", shlex.quote(path)])


def rm_cmd(path):
"""Return rm command"""
return " ".join(["/bin/rm", "-rf", path])
return " ".join(["/bin/rm", "-rf", shlex.quote(path)])


def filepath_exist_cmd(path):
"""Checks if filepath exists."""
return " ".join(["test", "-e", path])
return " ".join(["/bin/test", "-e", shlex.quote(path)])
2 changes: 1 addition & 1 deletion testplan/exporters/testing/json/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def export(
result = None
json_path = pathlib.Path(self.cfg.json_path).resolve()

if len(source):
if not source.is_empty():
json_path.parent.mkdir(parents=True, exist_ok=True)

test_plan_schema = TestReportSchema()
Expand Down
2 changes: 1 addition & 1 deletion testplan/exporters/testing/pdf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def export(
exporter=self, export_context=export_context
)
result = None
if len(source):
if not source.is_empty():
pdf_path = self.create_pdf(source)
self.logger.user_info("PDF generated at %s", pdf_path)

Expand Down
2 changes: 1 addition & 1 deletion testplan/exporters/testing/webserver/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def export(
exporter=self, export_context=export_context
)
result = None
if len(source):
if not source.is_empty():
exporter = JSONExporter(
json_path=self.cfg.json_path, split_json_report=False
)
Expand Down
51 changes: 49 additions & 2 deletions testplan/runnable/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import random
import re
import time
import traceback
import uuid
import webbrowser
from collections import OrderedDict
Expand All @@ -25,6 +26,7 @@
Union,
)

import exceptiongroup
from schema import And, Or, Use
from testplan import defaults
from testplan.common.config import ConfigOption
Expand Down Expand Up @@ -581,11 +583,53 @@ def add_remote_service(self, remote_service: "RemoteService"):
remote_service.parent = self
remote_service.cfg.parent = self.cfg
self.remote_services[name] = remote_service
remote_service.start()

def skip_step(self, step):
if isinstance(
self.result.step_results.get("_start_remote_services", None),
Exception,
):
if step in (
self._pre_exporters,
self._invoke_exporters,
self._post_exporters,
self._stop_remote_services,
):
return False
return True
return False

def _start_remote_services(self):
for rmt_svc in self.remote_services.values():
try:
rmt_svc.start()
except Exception as e:
msg = traceback.format_exc()
self.logger.error(msg)
self.report.logger.error(msg)
self.report.status_override = Status.ERROR
# skip the rest, set step return value
return e

def _stop_remote_services(self):
es = []
for rmt_svc in self.remote_services.values():
rmt_svc.stop()
try:
rmt_svc.stop()
except Exception as e:
msg = traceback.format_exc()
self.logger.error(msg)
# NOTE: rmt svc cannot be closed before report export due to
# NOTE: rmt ref being used during report export, it's
# NOTE: meaningless to update report obj here
self.report.status_override = Status.ERROR
es.append(e)
if es:
if len(es) > 1:
return exceptiongroup.ExceptionGroup(
"multiple remote services failed to stop", es
)
return es[0]

def _clone_task_for_part(self, task_info, _task_arguments, part):
_task_arguments["part"] = part
Expand Down Expand Up @@ -1120,6 +1164,7 @@ def add_pre_resource_steps(self):
"""Runnable steps to be executed before resources started."""
self._add_step(self.timer.start, "run")
super(TestRunner, self).add_pre_resource_steps()
self._add_step(self._start_remote_services)
self._add_step(self.make_runpath_dirs)
self._add_step(self._configure_file_logger)
self._add_step(self.calculate_pool_size)
Expand Down Expand Up @@ -1460,6 +1505,8 @@ def aborting(self):
"""Stop the web server if it is running."""
if self._web_server_thread is not None:
self._web_server_thread.stop()
# XXX: to be refactored after aborting logic implemented for rmt svcs
self._stop_remote_services()
self._stop_resource_monitor()
self._close_file_logger()

Expand Down
Loading

0 comments on commit 050a1eb

Please sign in to comment.