Skip to content

Commit

Permalink
refactor!: move submit out of the http_server.py (#37)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: `submit` is removed from BackgroundHTTPServer

Signed-off-by: Hongli Chen <[email protected]>
  • Loading branch information
Honglichenn authored Dec 19, 2023
1 parent 305fcad commit cdc3c7c
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 97 deletions.
13 changes: 5 additions & 8 deletions src/openjd/adaptor_runtime/_background/backend_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from types import FrameType
from typing import Optional, Union

from .server_response import ServerResponseGenerator
from .._osname import OSName
from ..adaptors import AdaptorRunner
from .._http import SocketDirectories
Expand Down Expand Up @@ -53,14 +54,10 @@ def _sigint_handler(self, signum: int, frame: Optional[FrameType]) -> None:
_logger.info("Interruption signal recieved.")
# OpenJD dictates that a SIGTERM/SIGINT results in a cancel workflow being
# kicked off.
# TODO: Do a code refactoring to move the `submit` to the `server_response`
if OSName.is_posix():
if self._server is not None:
self._server.submit( # type: ignore
self._adaptor_runner._cancel, force_immediate=True
)
else:
raise NotImplementedError("Signal is not implemented in Windows.")
if self._server is not None:
ServerResponseGenerator.submit_task(
self._server, self._adaptor_runner._cancel, force_immediate=True
)

def run(self) -> None:
"""
Expand Down
21 changes: 0 additions & 21 deletions src/openjd/adaptor_runtime/_background/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
import json
import logging
import socketserver
from http import HTTPStatus
from threading import Event
from typing import Callable
from .._osname import OSName
from .server_response import ServerResponseGenerator, AsyncFutureRunner
from ..adaptors import AdaptorRunner
Expand Down Expand Up @@ -47,25 +45,6 @@ def __init__(
self._future_runner = AsyncFutureRunner()
self._log_buffer = log_buffer

def submit(self, fn: Callable, *args, force_immediate=False, **kwargs) -> HTTPResponse:
"""
Submits work to the server.
Args:
force_immediate (bool): Force the server to immediately start the work. This work will
be performed concurrently with any ongoing work.
"""
future_runner = self._future_runner if not force_immediate else AsyncFutureRunner()
try:
future_runner.submit(fn, *args, **kwargs)
except Exception as e:
_logger.error(f"Failed to submit work: {e}")
return HTTPResponse(HTTPStatus.INTERNAL_SERVER_ERROR, body=str(e))

# Wait for the worker thread to start working before sending the response
self._future_runner.wait_for_start()
return HTTPResponse(HTTPStatus.OK)


class BackgroundRequestHandler(RequestHandler):
"""
Expand Down
54 changes: 42 additions & 12 deletions src/openjd/adaptor_runtime/_background/server_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
from http import HTTPStatus
from typing import Callable, Dict, TYPE_CHECKING, Any, Union
from typing import Callable, Dict, TYPE_CHECKING, Any, Union, Optional

if TYPE_CHECKING:
from .backend_named_pipe_server import WinBackgroundNamedPipeServer
Expand Down Expand Up @@ -98,12 +98,13 @@ def __init__(
self.body = body
self.query_string_params = query_string_params

def generate_cancel_put_response(self) -> HTTPResponse:
def generate_cancel_put_response(self) -> Optional[HTTPResponse]:
"""
Handle PUT request to /cancel path.
Returns:
Linux: return HTTPResponse.
Windows: return None. Response will be sent in self.response_method immediately.
"""
if not (
self.server._future_runner.is_running
Expand All @@ -116,7 +117,7 @@ def generate_cancel_put_response(self) -> HTTPResponse:
force_immediate=True,
)

def generate_heartbeat_get_response(self, parse_ack_id_fn: Callable) -> HTTPResponse:
def generate_heartbeat_get_response(self, parse_ack_id_fn: Callable) -> Optional[HTTPResponse]:
"""
Handle Get request to /heartbeat path.
Expand All @@ -125,6 +126,7 @@ def generate_heartbeat_get_response(self, parse_ack_id_fn: Callable) -> HTTPResp
Returns:
Linux: return HTTPResponse.
Windows: return None. Response will be sent in self.response_method immediately.
"""
# Failure messages are in the form: "<log-level>: openjd_fail: <message>"
_FAILURE_REGEX = f"^(?:\\w+: )?{re.escape(_OPENJD_FAIL_STDOUT_PREFIX)}"
Expand Down Expand Up @@ -161,17 +163,19 @@ def generate_shutdown_put_response(self) -> HTTPResponse:
Returns:
Linux: return HTTPResponse.
Windows: return None. Response will be sent in self.response_method immediately.
"""

self.server._shutdown_event.set()
return self.response_method(HTTPStatus.OK)

def generate_run_put_response(self) -> HTTPResponse:
def generate_run_put_response(self) -> Optional[HTTPResponse]:
"""
Handle Put request to /run path.
Returns:
Linux: return HTTPResponse.
Windows: return None. Response will be sent in self.response_method immediately.
"""
if self.server._future_runner.is_running:
return self.response_method(HTTPStatus.BAD_REQUEST)
Expand All @@ -181,24 +185,26 @@ def generate_run_put_response(self) -> HTTPResponse:
self.body if self.body else {},
)

def generate_start_put_response(self) -> HTTPResponse:
def generate_start_put_response(self) -> Optional[HTTPResponse]:
"""
Handle Put request to /start path.
Returns:
Linux: return HTTPResponse.
Windows: return None. Response will be sent in self.response_method immediately.
"""
if self.server._future_runner.is_running:
return self.response_method(HTTPStatus.BAD_REQUEST)

return self.submit(self.server._adaptor_runner._start)

def generate_stop_put_response(self) -> HTTPResponse:
def generate_stop_put_response(self) -> Optional[HTTPResponse]:
"""
Handle Put request to /stop path.
Returns:
Linux: return HTTPResponse.
Windows: return None. Response will be sent in self.response_method immediately.
"""

if self.server._future_runner.is_running:
Expand All @@ -216,24 +222,48 @@ def _stop_adaptor(self) -> None: # pragma: no cover
finally:
self.server._adaptor_runner._cleanup()

def submit(self, fn: Callable, *args, force_immediate=False, **kwargs) -> HTTPResponse:
@staticmethod
def submit_task(
server: Union[BackgroundHTTPServer, WinBackgroundNamedPipeServer],
fn: Callable,
*args,
force_immediate=False,
**kwargs,
):
"""
Submits work to the server.
Args:
force_immediate (bool): Force the server to immediately start the work. This work will
be performed concurrently with any ongoing work.
"""
future_runner = server._future_runner if not force_immediate else AsyncFutureRunner()
try:
future_runner.submit(fn, *args, **kwargs)
except Exception as e:
_logger.error(f"Failed to submit work: {e}")
raise e
future_runner.wait_for_start()

def submit(
self, fn: Callable, *args, force_immediate=False, **kwargs
) -> Optional[HTTPResponse]:
"""
Submits work to the server and generate a response for the client.
Args:
force_immediate (bool): Force the server to immediately start the work. This work will
be performed concurrently with any ongoing work.
Returns:
Linux: return HTTPResponse.
Windows: return None. Response will be sent in self.response_method immediately.
"""
future_runner = self.server._future_runner if not force_immediate else AsyncFutureRunner()
try:
future_runner.submit(fn, *args, **kwargs)
ServerResponseGenerator.submit_task(
self.server, fn, *args, force_immediate=force_immediate, **kwargs
)
except Exception as e:
_logger.error(f"Failed to submit work: {e}")
return self.response_method(HTTPStatus.INTERNAL_SERVER_ERROR, body=str(e))

# Wait for the worker thread to start working before sending the response
self.server._future_runner.wait_for_start()
return self.response_method(HTTPStatus.OK)
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ def test_run_raises_when_writing_connection_file_fails(
mock_os_remove.assert_has_calls([call(conn_file_path), call(socket_path)])

@patch.object(backend_runner.signal, "signal")
def test_signal_hook(self, signal_mock: MagicMock) -> None:
@patch.object(backend_runner.ServerResponseGenerator, "submit_task")
def test_signal_hook(self, mock_submit, signal_mock: MagicMock) -> None:
# Test that we create the signal hook, and that it initiates a cancelation
# as expected.

Expand All @@ -171,4 +172,4 @@ def test_signal_hook(self, signal_mock: MagicMock) -> None:
# THEN
signal_mock.assert_any_call(signal.SIGINT, runner._sigint_handler)
signal_mock.assert_any_call(signal.SIGTERM, runner._sigint_handler)
submit_mock.assert_called_with(adaptor_runner._cancel, force_immediate=True)
mock_submit.assert_called_with(server_mock, adaptor_runner._cancel, force_immediate=True)
54 changes: 0 additions & 54 deletions test/openjd/adaptor_runtime/unit/background/test_http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,60 +150,6 @@ def test_wait_for_start(self, mock_has_started, mock_sleep):
assert mock_sleep.called_once_with(AsyncFutureRunner._WAIT_FOR_START_INTERVAL)


class TestBackgroundHTTPServer:
"""
Tests for the BackgroundHTTPServer class
"""

class TestSubmit:
"""
Tests for the BackgroundHTTPServer.submit method
"""

def test_submits_work(self):
# GIVEN
def my_fn():
pass

args = ("one", "two")
kwargs = {"three": 3, "four": 4}

mock_server = MagicMock(spec=BackgroundHTTPServer)
mock_future_runner = MagicMock()
mock_server._future_runner = mock_future_runner

# WHEN
result = BackgroundHTTPServer.submit(mock_server, my_fn, *args, **kwargs)

# THEN
mock_future_runner.submit.assert_called_once_with(my_fn, *args, **kwargs)
mock_future_runner.wait_for_start.assert_called_once()
assert result.status == HTTPStatus.OK

def test_returns_500_if_fails_to_submit_work(self, caplog: pytest.LogCaptureFixture):
# GIVEN
def my_fn():
pass

args = ("one", "two")
kwargs = {"three": 3, "four": 4}

mock_server = MagicMock(spec=BackgroundHTTPServer)
mock_future_runner = MagicMock()
exc = Exception()
mock_future_runner.submit.side_effect = exc
mock_server._future_runner = mock_future_runner

# WHEN
result = BackgroundHTTPServer.submit(mock_server, my_fn, *args, **kwargs)

# THEN
mock_future_runner.submit.assert_called_once_with(my_fn, *args, **kwargs)
assert result.status == HTTPStatus.INTERNAL_SERVER_ERROR
assert result.body == str(exc)
assert "Failed to submit work: " in caplog.text


class TestBackgroundRequestHandler:
"""
Tests for the BackgroundRequestHandler class
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

from unittest.mock import MagicMock
import pytest
from openjd.adaptor_runtime._background.server_response import ServerResponseGenerator
from openjd.adaptor_runtime._background.http_server import BackgroundHTTPServer
from http import HTTPStatus


class TestServerResponseGenerator:
def test_submits_work(self):
# GIVEN
def my_fn():
pass

args = ("one", "two")
kwargs = {"three": 3, "four": 4}

mock_future_runner = MagicMock()
mock_server = MagicMock(spec=BackgroundHTTPServer)
mock_server._future_runner = mock_future_runner
mock_response_method = MagicMock()
mock_server_response = MagicMock()
mock_server_response.server = mock_server
mock_server_response.response_method = mock_response_method

# WHEN
ServerResponseGenerator.submit(mock_server_response, my_fn, *args, **kwargs)

# THEN
mock_future_runner.submit.assert_called_once_with(my_fn, *args, **kwargs)
mock_future_runner.wait_for_start.assert_called_once()
# assert mock_response_method.assert_called_once_with(HTTPStatus.OK)
mock_response_method.assert_called_once_with(HTTPStatus.OK)

def test_returns_500_if_fails_to_submit_work(self, caplog: pytest.LogCaptureFixture):
# GIVEN
def my_fn():
pass

args = ("one", "two")
kwargs = {"three": 3, "four": 4}

mock_server = MagicMock(spec=BackgroundHTTPServer)
mock_future_runner = MagicMock()
exc = Exception()
mock_future_runner.submit.side_effect = exc
mock_server._future_runner = mock_future_runner
mock_response_method = MagicMock()
mock_server_response = MagicMock()
mock_server_response.server = mock_server
mock_server_response.response_method = mock_response_method

# WHEN
ServerResponseGenerator.submit(mock_server_response, my_fn, *args, **kwargs)

# THEN
mock_future_runner.submit.assert_called_once_with(my_fn, *args, **kwargs)
mock_response_method.assert_called_once_with(
HTTPStatus.INTERNAL_SERVER_ERROR, body=str(exc)
)

assert "Failed to submit work: " in caplog.text

0 comments on commit cdc3c7c

Please sign in to comment.