diff --git a/testing/conftest.py b/testing/conftest.py index 4ea857a9..066df23a 100644 --- a/testing/conftest.py +++ b/testing/conftest.py @@ -1,7 +1,8 @@ -import pathlib import shutil -import subprocess import sys +from functools import lru_cache +from typing import Callable +from typing import Iterator import execnet import pytest @@ -23,10 +24,15 @@ def pytest_runtest_setup(item): @pytest.fixture -def makegateway(request): +def group_function() -> Iterator[execnet.Group]: group = execnet.Group() - request.addfinalizer(lambda: group.terminate(0.5)) - return group.makegateway + yield group + group.terminate(0.5) + + +@pytest.fixture +def makegateway(group_function) -> Callable[[str], execnet.gateway.Gateway]: + return group_function.makegateway pytest_plugins = ["pytester", "doctest"] @@ -101,37 +107,16 @@ def pytest_generate_tests(metafunc): else: gwtypes = ["popen", "socket", "ssh", "proxy"] metafunc.parametrize("gw", gwtypes, indirect=True) - elif "anypython" in metafunc.fixturenames: - metafunc.parametrize( - "anypython", - indirect=True, - argvalues=("sys.executable", "pypy3"), - ) -def getexecutable(name, cache={}): - try: - return cache[name] - except KeyError: - if name == "sys.executable": - return pathlib.Path(sys.executable) - path = shutil.which(name) - executable = pathlib.Path(path) if path is not None else None - if executable: - if name == "jython": - popen = subprocess.Popen( - [str(executable), "--version"], - universal_newlines=True, - stderr=subprocess.PIPE, - ) - out, err = popen.communicate() - if not err or "2.5" not in err: - executable = None - cache[name] = executable - return executable +@lru_cache() +def getexecutable(name): + if name == "sys.executable": + return sys.executable + return shutil.which(name) -@pytest.fixture +@pytest.fixture(params=("sys.executable", "pypy3")) def anypython(request): name = request.param executable = getexecutable(name) diff --git a/testing/test_basics.py b/testing/test_basics.py index 141f7d3f..5820c492 100644 --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -1,9 +1,14 @@ +from __future__ import annotations + import inspect import os import subprocess import sys import textwrap +from dataclasses import dataclass from io import BytesIO +from pathlib import Path +from typing import Any import execnet import pytest @@ -21,9 +26,8 @@ ) +@pytest.mark.parametrize("val", ["123", 42, [1, 2, 3], ["23", 25]]) class TestSerializeAPI: - pytestmark = [pytest.mark.parametrize("val", ["123", 42, [1, 2, 3], ["23", 25]])] - def test_serializer_api(self, val): dumped = execnet.dumps(val) val2 = execnet.loads(dumped) @@ -31,9 +35,9 @@ def test_serializer_api(self, val): def test_mmap(self, tmp_path, val): mmap = pytest.importorskip("mmap").mmap - p = tmp_path / "data" - with p.open("wb") as f: - f.write(execnet.dumps(val)) + p = tmp_path / "data.bin" + + p.write_bytes(execnet.dumps(val)) with p.open("r+b") as f: m = mmap(f.fileno(), 0) val2 = execnet.load(m) @@ -112,67 +116,83 @@ def read_write_loop(): break -def test_io_message(anypython, tmp_path, execmodel): - check = tmp_path / "check.py" - check.write_text( - inspect.getsource(gateway_base) - + textwrap.dedent( - """ - from io import BytesIO - import tempfile - temp_out = BytesIO() - temp_in = BytesIO() - io = Popen2IO(temp_out, temp_in, get_execmodel({backend!r})) - for i, handler in enumerate(Message._types): - print ("checking %s %s" %(i, handler)) - for data in "hello", "hello".encode('ascii'): - msg1 = Message(i, i, dumps(data)) - msg1.to_io(io) - x = io.outfile.getvalue() - io.outfile.truncate(0) - io.outfile.seek(0) - io.infile.seek(0) - io.infile.write(x) - io.infile.seek(0) - msg2 = Message.from_io(io) - assert msg1.channelid == msg2.channelid, (msg1, msg2) - assert msg1.data == msg2.data, (msg1.data, msg2.data) - assert msg1.msgcode == msg2.msgcode - print ("all passed") - """.format( - backend=execmodel.backend - ), +IO_MESSAGE_EXTRA_SOURCE = """ +import sys +backend = sys.argv[1] +try: + from io import BytesIO +except ImportError: + from StringIO import StringIO as BytesIO +import tempfile +temp_out = BytesIO() +temp_in = BytesIO() +io = Popen2IO(temp_out, temp_in, get_execmodel(backend)) +for i, handler in enumerate(Message._types): + print ("checking", i, handler) + for data in "hello", "hello".encode('ascii'): + msg1 = Message(i, i, dumps(data)) + msg1.to_io(io) + x = io.outfile.getvalue() + io.outfile.truncate(0) + io.outfile.seek(0) + io.infile.seek(0) + io.infile.write(x) + io.infile.seek(0) + msg2 = Message.from_io(io) + assert msg1.channelid == msg2.channelid, (msg1, msg2) + assert msg1.data == msg2.data, (msg1.data, msg2.data) + assert msg1.msgcode == msg2.msgcode +print ("all passed") +""" + + +@dataclass +class Checker: + python: str + path: Path + idx: int = 0 + + def run_check( + self, script: str, *extra_args: str, **process_args: Any + ) -> subprocess.CompletedProcess[str]: + self.idx += 1 + check_path = self.path / f"check{self.idx}.py" + check_path.write_text(script) + return subprocess.run( + [self.python, os.fspath(check_path), *extra_args], + capture_output=True, + text=True, + check=True, + **process_args, ) + + +@pytest.fixture +def checker(anypython: str, tmp_path: Path) -> Checker: + return Checker(python=anypython, path=tmp_path) + + +def test_io_message(checker, execmodel): + out = checker.run_check( + inspect.getsource(gateway_base) + IO_MESSAGE_EXTRA_SOURCE, execmodel.backend ) - out = subprocess.run( - [str(anypython), str(check)], text=True, capture_output=True, check=True - ).stdout - print(out) - assert "all passed" in out + print(out.stdout) + assert "all passed" in out.stdout -def test_popen_io(anypython, tmp_path, execmodel): - check = tmp_path / "check.py" - check.write_text( +def test_popen_io(checker, execmodel): + out = checker.run_check( inspect.getsource(gateway_base) - + textwrap.dedent( - f""" - io = init_popen_io(get_execmodel({execmodel.backend!r})) - io.write("hello".encode('ascii')) - s = io.read(1) - assert s == "x".encode('ascii') - """ - ), + + f""" +io = init_popen_io(get_execmodel({execmodel.backend!r})) +io.write(b"hello") +s = io.read(1) +assert s == b"x" +""", + input="x", ) - from subprocess import Popen, PIPE - - args = [str(anypython), str(check)] - proc = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) - proc.stdin.write(b"x") - stdout, stderr = proc.communicate() - print(stderr) - proc.wait() - assert b"hello" in stdout + print(out.stderr) + assert "hello" in out.stdout def test_popen_io_readloop(monkeypatch, execmodel): @@ -190,60 +210,45 @@ def newread(numbytes): assert result == b"tes" -def test_rinfo_source(anypython, tmp_path): - check = tmp_path / "check.py" - check.write_text( - textwrap.dedent( - """ - class Channel: - def send(self, data): - assert eval(repr(data), {}) == data - channel = Channel() - """ - ) - + inspect.getsource(gateway.rinfo_source) - + textwrap.dedent( - """ - print ('all passed') - """ - ) +def test_rinfo_source(checker): + out = checker.run_check( + f""" +class Channel: + def send(self, data): + assert eval(repr(data), {{}}) == data +channel = Channel() +{inspect.getsource(gateway.rinfo_source)} +print ('all passed') +""" ) - out = subprocess.run( - [str(anypython), str(check)], text=True, capture_output=True, check=True - ).stdout - print(out) - assert "all passed" in out + print(out.stdout) + assert "all passed" in out.stdout -def test_geterrortext(anypython, tmp_path): - check = tmp_path / "check.py" - check.write_text( + +def test_geterrortext(checker): + out = checker.run_check( inspect.getsource(gateway_base) - + textwrap.dedent( - """ - class Arg: - pass - errortext = geterrortext((Arg, "1", 4)) - assert "Arg" in errortext - import sys - try: - raise ValueError("17") - except ValueError: - excinfo = sys.exc_info() - s = geterrortext(excinfo) - assert "17" in s - print ("all passed") + + """ +class Arg: + pass +errortext = geterrortext((Arg, "1", 4)) +assert "Arg" in errortext +import sys +try: + raise ValueError("17") +except ValueError: + excinfo = sys.exc_info() + s = geterrortext(excinfo) + assert "17" in s + print ("all passed") """ - ) ) - out = subprocess.run( - [str(anypython), str(check)], text=True, capture_output=True, check=True - ).stdout - print(out) - assert "all passed" in out + print(out.stdout) + assert "all passed" in out.stdout -@pytest.mark.skipif(not hasattr(os, "dup"), reason="no os.dup") +@pytest.mark.skipif("not hasattr(os, 'dup')") def test_stdouterrin_setnull(execmodel, capfd): gateway_base.init_popen_io(execmodel) os.write(1, b"hello") @@ -297,15 +302,15 @@ def test_wire_protocol(self): class TestPureChannel: @pytest.fixture def fac(self, execmodel): - class Gateway: + class FakeGateway: def _trace(self, *args): pass def _send(self, *k): pass - Gateway.execmodel = execmodel - return ChannelFactory(Gateway()) + FakeGateway.execmodel = execmodel + return ChannelFactory(FakeGateway()) def test_factory_create(self, fac): chan1 = fac.new() diff --git a/testing/test_rsync.py b/testing/test_rsync.py index b8d39f86..fefd9a06 100644 --- a/testing/test_rsync.py +++ b/testing/test_rsync.py @@ -2,6 +2,7 @@ import pathlib import platform import sys +import types import execnet import pytest @@ -36,19 +37,22 @@ def gw2(request, group): ) -@pytest.fixture -def dirs(request, tmp_path): - t = tmp_path +class _dirs(types.SimpleNamespace): + source: pathlib.Path + dest1: pathlib.Path + dest2: pathlib.Path - class dirs: - source = t / "source" - dest1 = t / "dest1" - dest2 = t / "dest2" +@pytest.fixture +def dirs(request, tmp_path) -> _dirs: + dirs = _dirs( + source=tmp_path / "source", + dest1=tmp_path / "dest1", + dest2=tmp_path / "dest2", + ) dirs.source.mkdir() dirs.dest1.mkdir() dirs.dest2.mkdir() - return dirs diff --git a/testing/test_serializer.py b/testing/test_serializer.py index f71486e7..621816b1 100644 --- a/testing/test_serializer.py +++ b/testing/test_serializer.py @@ -1,99 +1,69 @@ -import pathlib -import shutil +import os import subprocess import sys -import tempfile +from pathlib import Path import execnet import pytest -MINOR_VERSIONS = {"3": "543210", "2": "76"} - -TEMPDIR = None -_py3_wrapper = None - - -def setup_module(mod): - mod.TEMPDIR = pathlib.Path(tempfile.mkdtemp()) - mod._py3_wrapper = PythonWrapper(pathlib.Path(sys.executable)) - - -def teardown_module(mod): - shutil.rmtree(TEMPDIR) - - -# we use the execnet folder in order to avoid triggering a missing apipkg -pyimportdir = str(pathlib.Path(execnet.__file__).parent) +# We use the execnet folder in order to avoid triggering a missing apipkg. +pyimportdir = os.fspath(Path(execnet.__file__).parent) class PythonWrapper: - def __init__(self, executable): + def __init__(self, executable, tmp_path): self.executable = executable + self.tmp_path = tmp_path - def dump(self, obj_rep): - script_file = TEMPDIR.joinpath("dump.py") + def dump(self, obj_rep: str) -> bytes: + script_file = self.tmp_path.joinpath("dump.py") script_file.write_text( - """ + f""" import sys -sys.path.insert(0, %r) +sys.path.insert(0, {pyimportdir!r}) import gateway_base as serializer -# Need binary output sys.stdout = sys.stdout.detach() -sys.stdout.write(serializer.dumps_internal(%s)) +sys.stdout.write(serializer.dumps_internal({obj_rep})) """ - % (pyimportdir, obj_rep) ) - popen = subprocess.Popen( - [str(self.executable), str(script_file)], - stdin=subprocess.PIPE, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, + res = subprocess.run( + [str(self.executable), str(script_file)], capture_output=True, check=True ) - stdout, stderr = popen.communicate() - ret = popen.returncode - if ret: - raise Exception( - "ExecutionFailed: %d %s\n%s" % (ret, self.executable, stderr) - ) - return stdout - - def load(self, data, option_args="__class__"): - script_file = TEMPDIR.joinpath("load.py") + return res.stdout + + def load(self, data: bytes): + script_file = self.tmp_path.joinpath("load.py") script_file.write_text( - r""" + rf""" import sys -sys.path.insert(0, %r) +sys.path.insert(0, {pyimportdir!r}) import gateway_base as serializer -sys.stdin = sys.stdin.detach() -loader = serializer.Unserializer(sys.stdin) -loader.%s +from io import BytesIO +data = {data!r} +io = BytesIO(data) +loader = serializer.Unserializer(io) obj = loader.load() sys.stdout.write(type(obj).__name__ + "\n") -sys.stdout.write(repr(obj))""" - % (pyimportdir, option_args) +sys.stdout.write(repr(obj)) +""" ) - popen = subprocess.Popen( - [str(self.executable), str(script_file)], - stdin=subprocess.PIPE, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, + res = subprocess.run( + [self.executable, script_file], + capture_output=True, ) - stdout, stderr = popen.communicate(data) - ret = popen.returncode - if ret: - raise Exception( - "ExecutionFailed: %d %s\n%s" % (ret, self.executable, stderr) - ) - return [s.decode("ascii") for s in stdout.splitlines()] + if res.returncode: + raise ValueError(res.stderr) + + return res.stdout.decode("ascii").splitlines() def __repr__(self): return f"" @pytest.fixture -def py3(request): - return _py3_wrapper +def py3(request, tmp_path): + return PythonWrapper(sys.executable, tmp_path) @pytest.fixture diff --git a/testing/test_termination.py b/testing/test_termination.py index 0df29a76..282c1979 100644 --- a/testing/test_termination.py +++ b/testing/test_termination.py @@ -66,11 +66,8 @@ def test_termination_on_remote_channel_receive(monkeypatch, makegateway): gw.remote_exec("channel.receive()") gw._group.terminate() command = ["ps", "-p", str(pid)] - popen = subprocess.Popen( - command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True - ) - out, err = popen.communicate() - assert str(pid) not in out, out + output = subprocess.run(command, capture_output=True, text=True) + assert str(pid) not in output.stdout, output def test_close_initiating_remote_no_error(testdir, anypython): diff --git a/testing/test_threadpool.py b/testing/test_threadpool.py index 75df627e..47a226d0 100644 --- a/testing/test_threadpool.py +++ b/testing/test_threadpool.py @@ -1,7 +1,6 @@ import os import sys -import py import pytest from execnet.gateway_base import WorkerPool