From 70580a28a43863e40e362beb4c975a058b760a66 Mon Sep 17 00:00:00 2001 From: Hongli Chen Date: Wed, 29 Nov 2023 14:49:36 -0800 Subject: [PATCH] refactor: Extract the Namedpipe Server creation logic to a common class NamedPipeServer. Signed-off-by: Hongli Chen --- .../_background/backend_named_pipe_server.py | 161 +------------- .../background_named_pipe_request_handler.py | 95 +++++++++ .../adaptor_runtime/_named_pipe/__init__.py | 6 + .../named_pipe_request_handler.py | 77 ++----- .../_named_pipe/named_pipe_server.py | 196 ++++++++++++++++++ 5 files changed, 323 insertions(+), 212 deletions(-) create mode 100644 src/openjd/adaptor_runtime/_background/background_named_pipe_request_handler.py create mode 100644 src/openjd/adaptor_runtime/_named_pipe/__init__.py rename src/openjd/adaptor_runtime/{_background => _named_pipe}/named_pipe_request_handler.py (55%) create mode 100644 src/openjd/adaptor_runtime/_named_pipe/named_pipe_server.py diff --git a/src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py b/src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py index 203c536..2b7c9c7 100644 --- a/src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py +++ b/src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py @@ -3,22 +3,16 @@ from __future__ import annotations import logging -from threading import Event, Thread -import time +from threading import Event +from typing import cast + +from pywintypes import HANDLE -from typing import List, Optional -from .named_pipe_request_handler import WinBackgroundResourceRequestHandler -from .server_config import NAMED_PIPE_BUFFER_SIZE, DEFAULT_NAMED_PIPE_TIMEOUT_MILLISECONDS +from .background_named_pipe_request_handler import WinBackgroundResourceRequestHandler from .server_response import AsyncFutureRunner -from .._osname import OSName +from .._named_pipe import NamedPipeServer -import win32pipe -import win32file -import pywintypes -import winerror -import win32api -from pywintypes import HANDLE from ..adaptors import AdaptorRunner from .log_buffers import LogBuffer @@ -27,39 +21,7 @@ _logger = logging.getLogger(__name__) -class MultipleErrors(Exception): - """ - Custom exception class to aggregate and handle multiple exceptions. - - This class is used to collect a list of exceptions that occur during a process, allowing - them to be raised together as a single exception. This is particularly useful in scenarios - where multiple operations are performed in a loop, and each operation could potentially - raise an exception. - """ - - def __init__(self, errors: List[Exception]): - """ - Initialize the MultipleErrors exception with a list of errors. - - Args: - errors (List[Exception]): A list of exceptions that have been raised. - """ - self.errors = errors - - def __str__(self) -> str: - """ - Return a string representation of all errors aggregated in this exception. - - This method concatenates the string representations of each individual exception - in the `errors` list, separated by semicolons. - - Returns: - str: A formatted string containing all the error messages. - """ - return "Multiple errors occurred: " + "; ".join(str(e) for e in self.errors) - - -class WinBackgroundNamedPipeServer: +class WinBackgroundNamedPipeServer(NamedPipeServer): """ A class to manage a Windows Named Pipe Server in background mode for the adaptor runtime communication. @@ -82,114 +44,13 @@ def __init__( shutdown_event (Event): An Event used for signaling server shutdown. log_buffer (LogBuffer|None, optional): Buffer for logging activities, defaults to None. """ - if not OSName.is_windows(): - raise OSError( - "WinBackgroundNamedPipeServer can be only used on Windows Operating Systems. " - f"Current Operating System is {OSName._get_os_name()}" - ) + super().__init__(pipe_name, shutdown_event) self._adaptor_runner = adaptor_runner self._shutdown_event = shutdown_event self._future_runner = AsyncFutureRunner() self._log_buffer = log_buffer - self._named_pipe_instances: List[HANDLE] = [] - self._pipe_name = pipe_name - # TODO: Need to figure out how to set the itme out for NamedPipe. - # Unlike Linux Server, time out can only be set in the Server side instead of the client side. - self._time_out = DEFAULT_NAMED_PIPE_TIMEOUT_MILLISECONDS - def _create_pipe(self, pipe_name: str) -> Optional[HANDLE]: - """ - Creates a new instance of a named pipe or an additional instance if the pipe already exists. - - Args: - pipe_name (str): Name of the pipe for which the instance is to be created. - - Returns: - HANDLE: The handler for the created named pipe instance. - - """ - - pipe_handle = win32pipe.CreateNamedPipe( - pipe_name, - # A bi-directional pipe; both server and client processes can read from and write to the pipe. - # win32file.FILE_FLAG_OVERLAPPED is used for async communication. - win32pipe.PIPE_ACCESS_DUPLEX | win32file.FILE_FLAG_OVERLAPPED, - win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT, - win32pipe.PIPE_UNLIMITED_INSTANCES, - NAMED_PIPE_BUFFER_SIZE, # nOutBufferSize - NAMED_PIPE_BUFFER_SIZE, # nInBufferSize - self._time_out, - None, # TODO: Add lpSecurityAttributes here to limit the access + def request_handler(self, server: "NamedPipeServer", pipe_handle: HANDLE): + return WinBackgroundResourceRequestHandler( + cast("WinBackgroundNamedPipeServer", server), pipe_handle ) - if pipe_handle == win32file.INVALID_HANDLE_VALUE: - return None - return pipe_handle - - def serve_forever(self) -> None: - """ - Runs the Named Pipe Server continuously until a shutdown signal is received. - - This method listens to the NamedPipe Server and creates new instances of named pipes - and corresponding threads for handling client-server communication. - """ - _logger.info(f"Creating Named Pipe with name: {self._pipe_name}") - # During shutdown, _shutdown_event will be set - while not self._shutdown_event.is_set(): - pipe_handle = self._create_pipe(self._pipe_name) - if pipe_handle is None: - error_msg = ( - f"Failed to create named pipe instance: " - f"{win32api.FormatMessage(win32api.GetLastError())}" - ) - _logger.error(error_msg) - raise RuntimeError(error_msg) - self._named_pipe_instances.append(pipe_handle) - _logger.debug("Waiting for connection from the client...") - - try: - win32pipe.ConnectNamedPipe(pipe_handle, None) - except pywintypes.error as e: - if e.winerror == winerror.ERROR_PIPE_NOT_CONNECTED: - _logger.info( - "NamedPipe Server is shutdown. Exit the main thread in the backend server." - ) - break - else: - _logger.error(f"Error encountered while connecting to NamedPipe: {e} ") - request_handler = WinBackgroundResourceRequestHandler(self, pipe_handle) - Thread(target=request_handler.instance_thread).start() - - def shutdown(self) -> None: - """ - Shuts down the Named Pipe server and closes all named pipe handlers. - - Signals the `serve_forever` method to stop listening to the NamedPipe Server by - setting the _shutdown_event. - """ - self._shutdown_event.set() - # TODO: Need to find out a better way to wait for the communication finish - # After sending the shutdown command, we need to wait for the response - # from it before shutting down server or the client won't get the response. - time.sleep(1) - error_list: List[Exception] = [] - while self._named_pipe_instances: - pipe_handle = self._named_pipe_instances.pop() - try: - win32pipe.DisconnectNamedPipe(pipe_handle) - win32file.CloseHandle(pipe_handle) - except pywintypes.error as e: - # If the communication is finished then handler may be closed - if e.args[0] == winerror.ERROR_INVALID_HANDLE: - pass - except Exception as e: - import traceback - - _logger.error( - f"Encountered the following error " - f"while shutting down the WinBackgroundNamedPipeServer: {str(traceback.format_exc())}" - ) - # Store any errors to raise after closing all pipe handles, - # allowing handling of multiple errors during shutdown. - error_list.append(e) - if error_list: - raise MultipleErrors(error_list) diff --git a/src/openjd/adaptor_runtime/_background/background_named_pipe_request_handler.py b/src/openjd/adaptor_runtime/_background/background_named_pipe_request_handler.py new file mode 100644 index 0000000..9a94403 --- /dev/null +++ b/src/openjd/adaptor_runtime/_background/background_named_pipe_request_handler.py @@ -0,0 +1,95 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +import json +from typing import TYPE_CHECKING, cast + +from .._named_pipe import ResourceRequestHandler + +if TYPE_CHECKING: # pragma: no cover because pytest will think we should test for this. + from .backend_named_pipe_server import WinBackgroundNamedPipeServer + +from openjd.adaptor_runtime._background.server_response import ServerResponseGenerator + +from pywintypes import HANDLE +import logging + +_logger = logging.getLogger(__name__) + + +class WinBackgroundResourceRequestHandler(ResourceRequestHandler): + """ + A handler for managing requests sent to a NamedPipe instance within a Windows environment. + + This class handles incoming requests, processes them, and sends back appropriate responses. + It is designed to work in conjunction with a WinBackgroundNamedPipeServer that manages the + lifecycle of the NamedPipe server and other associated resources. + """ + + def __init__(self, server: "WinBackgroundNamedPipeServer", pipe_handle: HANDLE): + """ + Initializes the WinBackgroundResourceRequestHandler with a server and pipe handle. + + Args: + server(WinBackgroundNamedPipeServer): The server instance that created this handler. + It is responsible for managing the lifecycle of the NamedPipe server and other resources. + pipe_handle(pipe_handle): The handle to the NamedPipe instance created and managed by the server. + pipe_handle(HANDLE): pipe_handle(HANDLE): Handle for the NamedPipe, established by the instance. + Utilized for message read/write operations. + """ + super().__init__(server, pipe_handle) + + def handle_request(self, data: str): + """ + Processes an incoming request and routes it to the correct response handler based on the method + and request path. + + Args: + data: A string containing the message sent from the client. + """ + request_dict = json.loads(data) + path = request_dict["path"] + body = json.loads(request_dict["body"]) + method = request_dict["method"] + + if "params" in request_dict and request_dict["params"] != "null": + query_string_params = json.loads(request_dict["params"]) + else: + query_string_params = {} + + server_operation = ServerResponseGenerator( + cast("WinBackgroundNamedPipeServer", self.server), + self.send_response, + body, + query_string_params, + ) + try: + # TODO: Code refactoring to get rid of the `if...elif..` by using getattr + if path == "/run" and method == "PUT": + server_operation.generate_run_put_response() + + elif path == "/shutdown" and method == "PUT": + server_operation.generate_shutdown_put_response() + + elif path == "/heartbeat" and method == "GET": + _ACK_ID_KEY = ServerResponseGenerator.ACK_ID_KEY + + def _parse_ack_id(): + if _ACK_ID_KEY in query_string_params: + return query_string_params[_ACK_ID_KEY] + + server_operation.generate_heartbeat_get_response(_parse_ack_id) + + elif path == "/start" and method == "PUT": + server_operation.generate_start_put_response() + + elif path == "/stop" and method == "PUT": + server_operation.generate_stop_put_response() + + elif path == "/cancel" and method == "PUT": + server_operation.generate_cancel_put_response() + except Exception as e: + _logger.error( + f"Error encountered in request handling. " + f"Path: '{path}', Method: '{method}', Error: '{str(e)}'" + ) + raise diff --git a/src/openjd/adaptor_runtime/_named_pipe/__init__.py b/src/openjd/adaptor_runtime/_named_pipe/__init__.py new file mode 100644 index 0000000..5949222 --- /dev/null +++ b/src/openjd/adaptor_runtime/_named_pipe/__init__.py @@ -0,0 +1,6 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +from .named_pipe_server import NamedPipeServer +from .named_pipe_request_handler import ResourceRequestHandler + +__all__ = ["NamedPipeServer", "ResourceRequestHandler"] diff --git a/src/openjd/adaptor_runtime/_background/named_pipe_request_handler.py b/src/openjd/adaptor_runtime/_named_pipe/named_pipe_request_handler.py similarity index 55% rename from src/openjd/adaptor_runtime/_background/named_pipe_request_handler.py rename to src/openjd/adaptor_runtime/_named_pipe/named_pipe_request_handler.py index 0e236da..e515738 100644 --- a/src/openjd/adaptor_runtime/_background/named_pipe_request_handler.py +++ b/src/openjd/adaptor_runtime/_named_pipe/named_pipe_request_handler.py @@ -3,19 +3,21 @@ import json from typing import TYPE_CHECKING + if TYPE_CHECKING: # pragma: no cover because pytest will think we should test for this. - from .backend_named_pipe_server import WinBackgroundNamedPipeServer + from openjd.adaptor_runtime._named_pipe import NamedPipeServer + from openjd.adaptor_runtime._background.named_pipe_helper import ( NamedPipeHelper, PipeDisconnectedException, ) -from openjd.adaptor_runtime._background.server_response import ServerResponseGenerator import win32pipe import win32file from pywintypes import HANDLE from http import HTTPStatus import logging import traceback +from abc import ABC, abstractmethod from openjd.adaptor_runtime._osname import OSName @@ -23,27 +25,27 @@ _logger = logging.getLogger(__name__) -class WinBackgroundResourceRequestHandler: +class ResourceRequestHandler(ABC): """ A handler for managing requests sent to a NamedPipe instance within a Windows environment. - This class handles incoming requests, processes them, and sends back appropriate responses. - It is designed to work in conjunction with a WinBackgroundNamedPipeServer that manages the - lifecycle of the NamedPipe server and other associated resources. """ - def __init__(self, server: "WinBackgroundNamedPipeServer", pipe_handle: HANDLE): + def __init__(self, server: "NamedPipeServer", pipe_handle: HANDLE): """ - Initializes the WinBackgroundResourceRequestHandler with a server and pipe handle. + Initializes the ResourceRequestHandler with a server and pipe handle. Args: - server(WinBackgroundNamedPipeServer): The server instance that created this handler. + server(NamedPipeServer): The server instance that created this handler. It is responsible for managing the lifecycle of the NamedPipe server and other resources. pipe_handle(pipe_handle): The handle to the NamedPipe instance created and managed by the server. + pipe_handle(HANDLE): pipe_handle(HANDLE): Handle for the NamedPipe, established by the instance. + Utilized for message read/write operations. """ + self._handler_type_name = self.__class__.__name__ if not OSName.is_windows(): raise OSError( - "WinBackgroundResourceRequestHandler can be only used on Windows Operating Systems. " + f"{self._handler_type_name} can be only used on Windows Operating Systems. " f"Current Operating System is {OSName._get_os_name()}" ) self.server = server @@ -86,7 +88,7 @@ def instance_thread(self) -> None: _logger.error( f"Encountered an error while closing the named pipe: {traceback.format_exc()}" ) - _logger.debug("WinBackgroundResourceRequestHandler instance thread exited.") + _logger.debug(f"{self._handler_type_name} instance thread exited.") def send_response(self, status: HTTPStatus, body: str = ""): """ @@ -100,55 +102,6 @@ def send_response(self, status: HTTPStatus, body: str = ""): _logger.debug(f"NamedPipe Server: Send Response: {response}") NamedPipeHelper.write_to_pipe(self.pipe_handle, json.dumps(response)) + @abstractmethod def handle_request(self, data: str): - """ - Processes an incoming request and routes it to the correct response handler based on the method - and request path. - - Args: - data: A string containing the message sent from the client. - """ - request_dict = json.loads(data) - path = request_dict["path"] - body = json.loads(request_dict["body"]) - method = request_dict["method"] - - if "params" in request_dict and request_dict["params"] != "null": - query_string_params = json.loads(request_dict["params"]) - else: - query_string_params = {} - - server_operation = ServerResponseGenerator( - self.server, self.send_response, body, query_string_params - ) - try: - # TODO: Code refactoring to get rid of the `if...elif..` by using getattr - if path == "/run" and method == "PUT": - server_operation.generate_run_put_response() - - elif path == "/shutdown" and method == "PUT": - server_operation.generate_shutdown_put_response() - - elif path == "/heartbeat" and method == "GET": - _ACK_ID_KEY = ServerResponseGenerator.ACK_ID_KEY - - def _parse_ack_id(): - if _ACK_ID_KEY in query_string_params: - return query_string_params[_ACK_ID_KEY] - - server_operation.generate_heartbeat_get_response(_parse_ack_id) - - elif path == "/start" and method == "PUT": - server_operation.generate_start_put_response() - - elif path == "/stop" and method == "PUT": - server_operation.generate_stop_put_response() - - elif path == "/cancel" and method == "PUT": - server_operation.generate_cancel_put_response() - except Exception as e: - _logger.error( - f"Error encountered in request handling. " - f"Path: '{path}', Method: '{method}', Error: '{str(e)}'" - ) - raise + raise NotImplementedError diff --git a/src/openjd/adaptor_runtime/_named_pipe/named_pipe_server.py b/src/openjd/adaptor_runtime/_named_pipe/named_pipe_server.py new file mode 100644 index 0000000..cc8975a --- /dev/null +++ b/src/openjd/adaptor_runtime/_named_pipe/named_pipe_server.py @@ -0,0 +1,196 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +from __future__ import annotations +import logging + +import threading +from threading import Event +import time + +from typing import List, Optional + +from openjd.adaptor_runtime._background.server_config import ( + NAMED_PIPE_BUFFER_SIZE, + DEFAULT_NAMED_PIPE_TIMEOUT_MILLISECONDS, +) +from openjd.adaptor_runtime._osname import OSName + +import win32pipe +import win32file +import pywintypes +import winerror +import win32api +from pywintypes import HANDLE + +from abc import ABC, abstractmethod + + +_logger = logging.getLogger(__name__) + + +class MultipleErrors(Exception): + """ + Custom exception class to aggregate and handle multiple exceptions. + + This class is used to collect a list of exceptions that occur during a process, allowing + them to be raised together as a single exception. This is particularly useful in scenarios + where multiple operations are performed in a loop, and each operation could potentially + raise an exception. + """ + + def __init__(self, errors: List[Exception]): + """ + Initialize the MultipleErrors exception with a list of errors. + + Args: + errors (List[Exception]): A list of exceptions that have been raised. + """ + self.errors = errors + + def __str__(self) -> str: + """ + Return a string representation of all errors aggregated in this exception. + + This method concatenates the string representations of each individual exception + in the `errors` list, separated by semicolons. + + Returns: + str: A formatted string containing all the error messages. + """ + return "Multiple errors occurred: " + "; ".join(str(e) for e in self.errors) + + +class NamedPipeServer(ABC): + """ + A class to manage a Windows Named Pipe Server in background mode for the adaptor runtime communication. + + This class encapsulates stateful information of the adaptor backend and provides methods + for server initialization, operation, and shutdown. + """ + + def __init__(self, pipe_name: str, shutdown_event: Event): # pragma: no cover + """ + Args: + pipe_name (str): Name of the pipe for the NamedPipe Server. + shutdown_event (Event): An Event used for signaling server shutdown. + """ + self._server_type_name = self.__class__.__name__ + if not OSName.is_windows(): + raise OSError( + f"{self._server_type_name} can be only used on Windows Operating Systems. " + f"Current Operating System is {OSName._get_os_name()}" + ) + self._named_pipe_instances: List[HANDLE] = [] + self._pipe_name = pipe_name + self._shutdown_event = shutdown_event + # TODO: Need to figure out how to set the itme out for NamedPipe. + # Unlike Linux Server, time out can only be set in the Server side instead of the client side. + self._time_out = DEFAULT_NAMED_PIPE_TIMEOUT_MILLISECONDS + + def _create_pipe(self, pipe_name: str) -> Optional[HANDLE]: + """ + Creates a new instance of a named pipe or an additional instance if the pipe already exists. + + Args: + pipe_name (str): Name of the pipe for which the instance is to be created. + + Returns: + HANDLE: The handler for the created named pipe instance. + + """ + + pipe_handle = win32pipe.CreateNamedPipe( + pipe_name, + # A bi-directional pipe; both server and client processes can read from and write to the pipe. + # win32file.FILE_FLAG_OVERLAPPED is used for async communication. + win32pipe.PIPE_ACCESS_DUPLEX | win32file.FILE_FLAG_OVERLAPPED, + win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT, + win32pipe.PIPE_UNLIMITED_INSTANCES, + NAMED_PIPE_BUFFER_SIZE, # nOutBufferSize + NAMED_PIPE_BUFFER_SIZE, # nInBufferSize + self._time_out, + None, # TODO: Add lpSecurityAttributes here to limit the access + ) + if pipe_handle == win32file.INVALID_HANDLE_VALUE: + return None + return pipe_handle + + def serve_forever(self) -> None: + """ + Runs the Named Pipe Server continuously until a shutdown signal is received. + + This method listens to the NamedPipe Server and creates new instances of named pipes + and corresponding threads for handling client-server communication. + """ + _logger.info(f"Creating Named Pipe with name: {self._pipe_name}") + print(f"Creating Named Pipe with name: {self._pipe_name}") + # During shutdown, a `True` will be pushed to the `_cancel_queue` for ending this loop + # TODO: Using threading.event instead of a queue to signal and termination + while not self._shutdown_event.is_set(): + pipe_handle = self._create_pipe(self._pipe_name) + if pipe_handle is None: + error_msg = ( + f"Failed to create named pipe instance: " + f"{win32api.FormatMessage(win32api.GetLastError())}" + ) + _logger.error(error_msg) + raise RuntimeError(error_msg) + self._named_pipe_instances.append(pipe_handle) + _logger.debug("Waiting for connection from the client...") + print("Waiting for connection from the client...") + + try: + win32pipe.ConnectNamedPipe(pipe_handle, None) + except pywintypes.error as e: + if e.winerror == winerror.ERROR_PIPE_NOT_CONNECTED: + _logger.info( + "NamedPipe Server is shutdown. Exit the main thread in the backend server." + ) + print( + "NamedPipe Server is shutdown. Exit the main thread in the backend server." + ) + break + else: + _logger.error(f"Error encountered while connecting to NamedPipe: {e} ") + print(f"Error encountered while connecting to NamedPipe: {e} ") + print("Handling response") + threading.Thread(target=self.request_handler(self, pipe_handle).instance_thread).start() + + @abstractmethod + def request_handler(self, server: NamedPipeServer, pipe_handle: HANDLE): + return NotImplemented + + def shutdown(self) -> None: + """ + Shuts down the Named Pipe server and closes all named pipe handlers. + + Signals the `serve_forever` method to stop listening to the NamedPipe Server by + pushing a `True` value into the `_cancel_queue`. + """ + self._shutdown_event.set() + # TODO: Need to find out a better way to wait for the communication finish + # After sending the shutdown command, we need to wait for the response + # from it before shutting down server or the client won't get the response. + time.sleep(1) + error_list: List[Exception] = [] + while self._named_pipe_instances: + pipe_handle = self._named_pipe_instances.pop() + try: + win32pipe.DisconnectNamedPipe(pipe_handle) + win32file.CloseHandle(pipe_handle) + except pywintypes.error as e: + # If the communication is finished then handler may be closed + if e.args[0] == winerror.ERROR_INVALID_HANDLE: + pass + except Exception as e: + import traceback + + _logger.error( + f"Encountered the following error " + f"while shutting down the {self._server_type_name}: {str(traceback.format_exc())}" + ) + # Store any errors to raise after closing all pipe handles, + # allowing handling of multiple errors during shutdown. + error_list.append(e) + if error_list: + raise MultipleErrors(error_list)