Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: move submit out of the http_server.py #37

Merged
merged 4 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions src/openjd/adaptor_runtime/_background/backend_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from types import FrameType
from typing import Optional, Union

from .server_response import ServerResponseGenerator
from .._osname import OSName
from ..adaptors import AdaptorRunner
from .._http import SocketDirectories
Expand Down Expand Up @@ -53,14 +54,10 @@ def _sigint_handler(self, signum: int, frame: Optional[FrameType]) -> None:
_logger.info("Interruption signal recieved.")
# OpenJD dictates that a SIGTERM/SIGINT results in a cancel workflow being
# kicked off.
# TODO: Do a code refactoring to move the `submit` to the `server_response`
if OSName.is_posix():
if self._server is not None:
self._server.submit( # type: ignore
self._adaptor_runner._cancel, force_immediate=True
)
else:
raise NotImplementedError("Signal is not implemented in Windows.")
if self._server is not None:
ServerResponseGenerator.submit_task(
self._server, self._adaptor_runner._cancel, force_immediate=True
)

def run(self) -> None:
"""
Expand Down
21 changes: 0 additions & 21 deletions src/openjd/adaptor_runtime/_background/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
import json
import logging
import socketserver
from http import HTTPStatus
from threading import Event
from typing import Callable
from .._osname import OSName
from .server_response import ServerResponseGenerator, AsyncFutureRunner
from ..adaptors import AdaptorRunner
Expand Down Expand Up @@ -47,25 +45,6 @@ def __init__(
self._future_runner = AsyncFutureRunner()
self._log_buffer = log_buffer

def submit(self, fn: Callable, *args, force_immediate=False, **kwargs) -> HTTPResponse:
"""
Submits work to the server.

Args:
force_immediate (bool): Force the server to immediately start the work. This work will
be performed concurrently with any ongoing work.
"""
future_runner = self._future_runner if not force_immediate else AsyncFutureRunner()
try:
future_runner.submit(fn, *args, **kwargs)
except Exception as e:
_logger.error(f"Failed to submit work: {e}")
return HTTPResponse(HTTPStatus.INTERNAL_SERVER_ERROR, body=str(e))

# Wait for the worker thread to start working before sending the response
self._future_runner.wait_for_start()
return HTTPResponse(HTTPStatus.OK)


class BackgroundRequestHandler(RequestHandler):
"""
Expand Down
54 changes: 43 additions & 11 deletions src/openjd/adaptor_runtime/_background/server_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,13 @@ def __init__(
self.body = body
self.query_string_params = query_string_params

def generate_cancel_put_response(self) -> HTTPResponse:
def generate_cancel_put_response(self) -> Union[HTTPResponse | None]:
"""
Handle PUT request to /cancel path.

Returns:
Linux: return HTTPResponse.
Windows: return None. Response will be sent in self.response_method immediately.
"""
if not (
self.server._future_runner.is_running
Expand All @@ -116,7 +117,9 @@ def generate_cancel_put_response(self) -> HTTPResponse:
force_immediate=True,
)

def generate_heartbeat_get_response(self, parse_ack_id_fn: Callable) -> HTTPResponse:
def generate_heartbeat_get_response(
self, parse_ack_id_fn: Callable
) -> Union[HTTPResponse | None]:
Honglichenn marked this conversation as resolved.
Show resolved Hide resolved
"""
Handle Get request to /heartbeat path.

Expand All @@ -125,6 +128,7 @@ def generate_heartbeat_get_response(self, parse_ack_id_fn: Callable) -> HTTPResp

Returns:
Linux: return HTTPResponse.
Windows: return None. Response will be sent in self.response_method immediately.
"""
# Failure messages are in the form: "<log-level>: openjd_fail: <message>"
_FAILURE_REGEX = f"^(?:\\w+: )?{re.escape(_OPENJD_FAIL_STDOUT_PREFIX)}"
Expand Down Expand Up @@ -161,17 +165,19 @@ def generate_shutdown_put_response(self) -> HTTPResponse:

Returns:
Linux: return HTTPResponse.
Windows: return None. Response will be sent in self.response_method immediately.
"""

self.server._shutdown_event.set()
return self.response_method(HTTPStatus.OK)

def generate_run_put_response(self) -> HTTPResponse:
def generate_run_put_response(self) -> Union[HTTPResponse | None]:
"""
Handle Put request to /run path.

Returns:
Linux: return HTTPResponse.
Windows: return None. Response will be sent in self.response_method immediately.
"""
if self.server._future_runner.is_running:
return self.response_method(HTTPStatus.BAD_REQUEST)
Expand All @@ -181,7 +187,7 @@ def generate_run_put_response(self) -> HTTPResponse:
self.body if self.body else {},
)

def generate_start_put_response(self) -> HTTPResponse:
def generate_start_put_response(self) -> Union[HTTPResponse | None]:
Honglichenn marked this conversation as resolved.
Show resolved Hide resolved
"""
Handle Put request to /start path.

Expand All @@ -193,12 +199,13 @@ def generate_start_put_response(self) -> HTTPResponse:

return self.submit(self.server._adaptor_runner._start)

def generate_stop_put_response(self) -> HTTPResponse:
def generate_stop_put_response(self) -> Union[HTTPResponse | None]:
"""
Handle Put request to /stop path.

Returns:
Linux: return HTTPResponse.
Windows: return None. Response will be sent in self.response_method immediately.
"""

if self.server._future_runner.is_running:
Expand All @@ -216,24 +223,49 @@ def _stop_adaptor(self) -> None: # pragma: no cover
finally:
self.server._adaptor_runner._cleanup()

def submit(self, fn: Callable, *args, force_immediate=False, **kwargs) -> HTTPResponse:
@staticmethod
def submit_task(
server: Union[BackgroundHTTPServer, WinBackgroundNamedPipeServer],
fn: Callable,
*args,
force_immediate=False,
**kwargs,
):
"""
Submits work to the server.

Args:
force_immediate (bool): Force the server to immediately start the work. This work will
be performed concurrently with any ongoing work.
"""
future_runner = server._future_runner if not force_immediate else AsyncFutureRunner()
try:
future_runner.submit(fn, *args, **kwargs)
except Exception as e:
_logger.error(f"Failed to submit work: {e}")
raise e
# Wait for the worker thread to start working before sending the response
server._future_runner.wait_for_start()
AWS-Samuel marked this conversation as resolved.
Show resolved Hide resolved

def submit(
self, fn: Callable, *args, force_immediate=False, **kwargs
) -> Union[HTTPResponse | None]:
"""
Submits work to the server and generate a response for the client.

Args:
force_immediate (bool): Force the server to immediately start the work. This work will
be performed concurrently with any ongoing work.

Returns:
Linux: return HTTPResponse.
Windows: return None. Response will be sent in self.response_method immediately.
"""
future_runner = self.server._future_runner if not force_immediate else AsyncFutureRunner()
try:
future_runner.submit(fn, *args, **kwargs)
ServerResponseGenerator.submit_task(
self.server, fn, *args, force_immediate=force_immediate, **kwargs
)
except Exception as e:
_logger.error(f"Failed to submit work: {e}")
return self.response_method(HTTPStatus.INTERNAL_SERVER_ERROR, body=str(e))

# Wait for the worker thread to start working before sending the response
self.server._future_runner.wait_for_start()
return self.response_method(HTTPStatus.OK)
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ def test_run_raises_when_writing_connection_file_fails(
mock_os_remove.assert_has_calls([call(conn_file_path), call(socket_path)])

@patch.object(backend_runner.signal, "signal")
def test_signal_hook(self, signal_mock: MagicMock) -> None:
@patch.object(backend_runner.ServerResponseGenerator, "submit_task")
def test_signal_hook(self, mock_submit, signal_mock: MagicMock) -> None:
# Test that we create the signal hook, and that it initiates a cancelation
# as expected.

Expand All @@ -171,4 +172,4 @@ def test_signal_hook(self, signal_mock: MagicMock) -> None:
# THEN
signal_mock.assert_any_call(signal.SIGINT, runner._sigint_handler)
signal_mock.assert_any_call(signal.SIGTERM, runner._sigint_handler)
submit_mock.assert_called_with(adaptor_runner._cancel, force_immediate=True)
mock_submit.assert_called_with(server_mock, adaptor_runner._cancel, force_immediate=True)
54 changes: 0 additions & 54 deletions test/openjd/adaptor_runtime/unit/background/test_http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,60 +150,6 @@ def test_wait_for_start(self, mock_has_started, mock_sleep):
assert mock_sleep.called_once_with(AsyncFutureRunner._WAIT_FOR_START_INTERVAL)


class TestBackgroundHTTPServer:
"""
Tests for the BackgroundHTTPServer class
"""

class TestSubmit:
"""
Tests for the BackgroundHTTPServer.submit method
"""

def test_submits_work(self):
# GIVEN
def my_fn():
pass

args = ("one", "two")
kwargs = {"three": 3, "four": 4}

mock_server = MagicMock(spec=BackgroundHTTPServer)
mock_future_runner = MagicMock()
mock_server._future_runner = mock_future_runner

# WHEN
result = BackgroundHTTPServer.submit(mock_server, my_fn, *args, **kwargs)

# THEN
mock_future_runner.submit.assert_called_once_with(my_fn, *args, **kwargs)
mock_future_runner.wait_for_start.assert_called_once()
assert result.status == HTTPStatus.OK

def test_returns_500_if_fails_to_submit_work(self, caplog: pytest.LogCaptureFixture):
# GIVEN
def my_fn():
pass

args = ("one", "two")
kwargs = {"three": 3, "four": 4}

mock_server = MagicMock(spec=BackgroundHTTPServer)
mock_future_runner = MagicMock()
exc = Exception()
mock_future_runner.submit.side_effect = exc
mock_server._future_runner = mock_future_runner

# WHEN
result = BackgroundHTTPServer.submit(mock_server, my_fn, *args, **kwargs)

# THEN
mock_future_runner.submit.assert_called_once_with(my_fn, *args, **kwargs)
assert result.status == HTTPStatus.INTERNAL_SERVER_ERROR
assert result.body == str(exc)
assert "Failed to submit work: " in caplog.text


class TestBackgroundRequestHandler:
"""
Tests for the BackgroundRequestHandler class
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

from unittest.mock import MagicMock
import pytest
from openjd.adaptor_runtime._background.server_response import ServerResponseGenerator
from openjd.adaptor_runtime._background.http_server import BackgroundHTTPServer
from http import HTTPStatus


class TestServerResponseGenerator:
def test_submits_work(self):
# GIVEN
def my_fn():
pass

args = ("one", "two")
kwargs = {"three": 3, "four": 4}

mock_future_runner = MagicMock()
mock_server = MagicMock(spec=BackgroundHTTPServer)
mock_server._future_runner = mock_future_runner
mock_response_method = MagicMock()
mock_server_response = MagicMock()
mock_server_response.server = mock_server
mock_server_response.response_method = mock_response_method

# WHEN
ServerResponseGenerator.submit(mock_server_response, my_fn, *args, **kwargs)

# THEN
mock_future_runner.submit.assert_called_once_with(my_fn, *args, **kwargs)
mock_future_runner.wait_for_start.assert_called_once()
# assert mock_response_method.assert_called_once_with(HTTPStatus.OK)
mock_response_method.assert_called_once_with(HTTPStatus.OK)

def test_returns_500_if_fails_to_submit_work(self, caplog: pytest.LogCaptureFixture):
# GIVEN
def my_fn():
pass

args = ("one", "two")
kwargs = {"three": 3, "four": 4}

mock_server = MagicMock(spec=BackgroundHTTPServer)
mock_future_runner = MagicMock()
exc = Exception()
mock_future_runner.submit.side_effect = exc
mock_server._future_runner = mock_future_runner
mock_response_method = MagicMock()
mock_server_response = MagicMock()
mock_server_response.server = mock_server
mock_server_response.response_method = mock_response_method

# WHEN
ServerResponseGenerator.submit(mock_server_response, my_fn, *args, **kwargs)

# THEN
mock_future_runner.submit.assert_called_once_with(my_fn, *args, **kwargs)
mock_response_method.assert_called_once_with(
HTTPStatus.INTERNAL_SERVER_ERROR, body=str(exc)
)

assert "Failed to submit work: " in caplog.text