From 86a6f284225be4d120ccd297750093be02b5b5c8 Mon Sep 17 00:00:00 2001 From: Hongli Chen Date: Tue, 28 Nov 2023 12:45:08 -0800 Subject: [PATCH] refactor: Replace cancel queue with a shutdown event. Signed-off-by: Hongli Chen --- .../_background/backend_named_pipe_server.py | 20 +++++++++---------- .../_background/backend_runner.py | 15 +++++++------- .../_background/http_server.py | 6 +++--- .../_background/server_response.py | 2 +- .../unit/background/test_http_server.py | 8 ++++---- 5 files changed, 24 insertions(+), 27 deletions(-) diff --git a/src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py b/src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py index 6f1c5b5..203c536 100644 --- a/src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py +++ b/src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py @@ -3,10 +3,9 @@ from __future__ import annotations import logging -import threading +from threading import Event, Thread import time -from queue import Queue from typing import List, Optional from .named_pipe_request_handler import WinBackgroundResourceRequestHandler @@ -72,7 +71,7 @@ def __init__( self, pipe_name: str, adaptor_runner: AdaptorRunner, - cancel_queue: Queue, + shutdown_event: Event, *, log_buffer: LogBuffer | None = None, ): # pragma: no cover @@ -80,7 +79,7 @@ def __init__( Args: pipe_name (str): Name of the pipe for the NamedPipe Server. adaptor_runner (AdaptorRunner): Adaptor runner instance for operation execution. - cancel_queue (Queue): Queue used for signaling server shutdown. + shutdown_event (Event): An Event used for signaling server shutdown. log_buffer (LogBuffer|None, optional): Buffer for logging activities, defaults to None. """ if not OSName.is_windows(): @@ -89,7 +88,7 @@ def __init__( f"Current Operating System is {OSName._get_os_name()}" ) self._adaptor_runner = adaptor_runner - self._cancel_queue = cancel_queue + self._shutdown_event = shutdown_event self._future_runner = AsyncFutureRunner() self._log_buffer = log_buffer self._named_pipe_instances: List[HANDLE] = [] @@ -134,9 +133,8 @@ def serve_forever(self) -> None: and corresponding threads for handling client-server communication. """ _logger.info(f"Creating Named Pipe with name: {self._pipe_name}") - # During shutdown, a `True` will be pushed to the `_cancel_queue` for ending this loop - # TODO: Using threading.event instead of a queue to signal and termination - while self._cancel_queue.qsize() == 0: + # During shutdown, _shutdown_event will be set + while not self._shutdown_event.is_set(): pipe_handle = self._create_pipe(self._pipe_name) if pipe_handle is None: error_msg = ( @@ -159,16 +157,16 @@ def serve_forever(self) -> None: else: _logger.error(f"Error encountered while connecting to NamedPipe: {e} ") request_handler = WinBackgroundResourceRequestHandler(self, pipe_handle) - threading.Thread(target=request_handler.instance_thread).start() + Thread(target=request_handler.instance_thread).start() def shutdown(self) -> None: """ Shuts down the Named Pipe server and closes all named pipe handlers. Signals the `serve_forever` method to stop listening to the NamedPipe Server by - pushing a `True` value into the `_cancel_queue`. + setting the _shutdown_event. """ - self._cancel_queue.put(True) + self._shutdown_event.set() # TODO: Need to find out a better way to wait for the communication finish # After sending the shutdown command, we need to wait for the response # from it before shutting down server or the client won't get the response. diff --git a/src/openjd/adaptor_runtime/_background/backend_runner.py b/src/openjd/adaptor_runtime/_background/backend_runner.py index 54674b6..b33ce10 100644 --- a/src/openjd/adaptor_runtime/_background/backend_runner.py +++ b/src/openjd/adaptor_runtime/_background/backend_runner.py @@ -6,8 +6,7 @@ import logging import os import signal -from queue import Queue -from threading import Thread +from threading import Thread, Event from types import FrameType from typing import Optional, Union @@ -71,7 +70,7 @@ def run(self) -> None: that port to a connection file, and listens for HTTP requests until a shutdown is requested """ _logger.info("Running in background daemon mode.") - queue: Queue = Queue() + shutdown_event: Event = Event() if OSName.is_posix(): server_path = SocketDirectories.for_os().get_process_socket_path( @@ -87,14 +86,14 @@ def run(self) -> None: self._server = WinBackgroundNamedPipeServer( server_path, self._adaptor_runner, - cancel_queue=queue, + shutdown_event=shutdown_event, log_buffer=self._log_buffer, ) else: self._server = BackgroundHTTPServer( server_path, self._adaptor_runner, - cancel_queue=queue, + shutdown_event=shutdown_event, log_buffer=self._log_buffer, ) _logger.debug(f"Listening on {server_path}") @@ -118,11 +117,11 @@ def run(self) -> None: except OSError as e: _logger.error(f"Error writing to connection file: {e}") _logger.info("Shutting down server...") - queue.put(True) + shutdown_event.set() raise finally: - # Block until the cancel queue has been pushed to - queue.get() + # Block until the shutdown_event is set + shutdown_event.wait() # Shutdown the server self._server.shutdown() # type: ignore diff --git a/src/openjd/adaptor_runtime/_background/http_server.py b/src/openjd/adaptor_runtime/_background/http_server.py index 960152e..9636000 100644 --- a/src/openjd/adaptor_runtime/_background/http_server.py +++ b/src/openjd/adaptor_runtime/_background/http_server.py @@ -6,7 +6,7 @@ import logging import socketserver from http import HTTPStatus -from queue import Queue +from threading import Event from typing import Callable from .._osname import OSName from .server_response import ServerResponseGenerator, AsyncFutureRunner @@ -36,14 +36,14 @@ def __init__( self, socket_path: str, adaptor_runner: AdaptorRunner, - cancel_queue: Queue, + shutdown_event: Event, *, log_buffer: LogBuffer | None = None, bind_and_activate: bool = True, ) -> None: # pragma: no cover super().__init__(socket_path, BackgroundRequestHandler, bind_and_activate) # type: ignore self._adaptor_runner = adaptor_runner - self._cancel_queue = cancel_queue + self._shutdown_event = shutdown_event self._future_runner = AsyncFutureRunner() self._log_buffer = log_buffer diff --git a/src/openjd/adaptor_runtime/_background/server_response.py b/src/openjd/adaptor_runtime/_background/server_response.py index d22677d..44203fc 100644 --- a/src/openjd/adaptor_runtime/_background/server_response.py +++ b/src/openjd/adaptor_runtime/_background/server_response.py @@ -163,7 +163,7 @@ def generate_shutdown_put_response(self) -> HTTPResponse: Linux: return HTTPResponse. """ - self.server._cancel_queue.put(True) + self.server._shutdown_event.set() return self.response_method(HTTPStatus.OK) def generate_run_put_response(self) -> HTTPResponse: diff --git a/test/openjd/adaptor_runtime/unit/background/test_http_server.py b/test/openjd/adaptor_runtime/unit/background/test_http_server.py index cf6fbcb..d6fd343 100644 --- a/test/openjd/adaptor_runtime/unit/background/test_http_server.py +++ b/test/openjd/adaptor_runtime/unit/background/test_http_server.py @@ -4,7 +4,7 @@ import os import socketserver from http import HTTPStatus -from queue import Queue +from threading import Event from unittest.mock import MagicMock, PropertyMock, patch import pytest @@ -554,8 +554,8 @@ def test_signals_to_the_server_thread(self): # GIVEN mock_request_handler = MagicMock() mock_server = MagicMock(spec=BackgroundHTTPServer) - mock_cancel_queue = MagicMock(spec=Queue) - mock_server._cancel_queue = mock_cancel_queue + mock_shutdown_event = MagicMock(spec=Event) + mock_server._shutdown_event = mock_shutdown_event mock_request_handler.server = mock_server mock_request_handler.headers = {"Content-Length": 0} mock_request_handler.path = "" @@ -565,7 +565,7 @@ def test_signals_to_the_server_thread(self): response = handler.put() # THEN - mock_cancel_queue.put.assert_called_once_with(True) + mock_shutdown_event.set.assert_called_once() assert response.status == HTTPStatus.OK assert response.body is None