forked from hatchet-dev/hatchet-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconftest.py
59 lines (42 loc) · 1.62 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import logging
import subprocess
import time
from io import BytesIO
from threading import Thread
from typing import AsyncGenerator, Callable, cast
import psutil
import pytest
import pytest_asyncio
from hatchet_sdk import Hatchet
@pytest_asyncio.fixture(scope="session")
async def aiohatchet() -> AsyncGenerator[Hatchet, None]:
yield Hatchet(debug=True)
@pytest.fixture(scope="session")
def hatchet() -> Hatchet:
return Hatchet(debug=True)
@pytest.fixture()
def worker(request: pytest.FixtureRequest):
example = cast(str, request.param)
command = ["poetry", "run", example]
logging.info(f"Starting background worker: {' '.join(command)}")
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Check if the process is still running
if proc.poll() is not None:
raise Exception(f"Worker failed to start with return code {proc.returncode}")
time.sleep(5)
def log_output(pipe: BytesIO, log_func: Callable[[str], None]) -> None:
for line in iter(pipe.readline, b""):
log_func(line.decode().strip())
Thread(target=log_output, args=(proc.stdout, logging.info), daemon=True).start()
Thread(target=log_output, args=(proc.stderr, logging.error), daemon=True).start()
yield proc
logging.info("Cleaning up background worker")
parent = psutil.Process(proc.pid)
children = parent.children(recursive=True)
for child in children:
child.terminate()
parent.terminate()
_, alive = psutil.wait_procs([parent] + children, timeout=3)
for p in alive:
logging.warning(f"Force killing process {p.pid}")
p.kill()