Skip to content

Commit

Permalink
refactor: Extract the Namedpipe Server creation logic to a common cla…
Browse files Browse the repository at this point in the history
…ss NamedPipeServer.

Signed-off-by: Hongli Chen <[email protected]>
  • Loading branch information
Honglichenn committed Nov 29, 2023
1 parent 256a223 commit 70580a2
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 212 deletions.
161 changes: 11 additions & 150 deletions src/openjd/adaptor_runtime/_background/backend_named_pipe_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,16 @@
from __future__ import annotations
import logging

from threading import Event, Thread
import time
from threading import Event
from typing import cast

from pywintypes import HANDLE

from typing import List, Optional

from .named_pipe_request_handler import WinBackgroundResourceRequestHandler
from .server_config import NAMED_PIPE_BUFFER_SIZE, DEFAULT_NAMED_PIPE_TIMEOUT_MILLISECONDS
from .background_named_pipe_request_handler import WinBackgroundResourceRequestHandler
from .server_response import AsyncFutureRunner
from .._osname import OSName
from .._named_pipe import NamedPipeServer

import win32pipe
import win32file
import pywintypes
import winerror
import win32api
from pywintypes import HANDLE

from ..adaptors import AdaptorRunner
from .log_buffers import LogBuffer
Expand All @@ -27,39 +21,7 @@
_logger = logging.getLogger(__name__)


class MultipleErrors(Exception):
"""
Custom exception class to aggregate and handle multiple exceptions.
This class is used to collect a list of exceptions that occur during a process, allowing
them to be raised together as a single exception. This is particularly useful in scenarios
where multiple operations are performed in a loop, and each operation could potentially
raise an exception.
"""

def __init__(self, errors: List[Exception]):
"""
Initialize the MultipleErrors exception with a list of errors.
Args:
errors (List[Exception]): A list of exceptions that have been raised.
"""
self.errors = errors

def __str__(self) -> str:
"""
Return a string representation of all errors aggregated in this exception.
This method concatenates the string representations of each individual exception
in the `errors` list, separated by semicolons.
Returns:
str: A formatted string containing all the error messages.
"""
return "Multiple errors occurred: " + "; ".join(str(e) for e in self.errors)


class WinBackgroundNamedPipeServer:
class WinBackgroundNamedPipeServer(NamedPipeServer):
"""
A class to manage a Windows Named Pipe Server in background mode for the adaptor runtime communication.
Expand All @@ -82,114 +44,13 @@ def __init__(
shutdown_event (Event): An Event used for signaling server shutdown.
log_buffer (LogBuffer|None, optional): Buffer for logging activities, defaults to None.
"""
if not OSName.is_windows():
raise OSError(
"WinBackgroundNamedPipeServer can be only used on Windows Operating Systems. "
f"Current Operating System is {OSName._get_os_name()}"
)
super().__init__(pipe_name, shutdown_event)
self._adaptor_runner = adaptor_runner
self._shutdown_event = shutdown_event
self._future_runner = AsyncFutureRunner()
self._log_buffer = log_buffer
self._named_pipe_instances: List[HANDLE] = []
self._pipe_name = pipe_name
# TODO: Need to figure out how to set the itme out for NamedPipe.
# Unlike Linux Server, time out can only be set in the Server side instead of the client side.
self._time_out = DEFAULT_NAMED_PIPE_TIMEOUT_MILLISECONDS

def _create_pipe(self, pipe_name: str) -> Optional[HANDLE]:
"""
Creates a new instance of a named pipe or an additional instance if the pipe already exists.
Args:
pipe_name (str): Name of the pipe for which the instance is to be created.
Returns:
HANDLE: The handler for the created named pipe instance.
"""

pipe_handle = win32pipe.CreateNamedPipe(
pipe_name,
# A bi-directional pipe; both server and client processes can read from and write to the pipe.
# win32file.FILE_FLAG_OVERLAPPED is used for async communication.
win32pipe.PIPE_ACCESS_DUPLEX | win32file.FILE_FLAG_OVERLAPPED,
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
win32pipe.PIPE_UNLIMITED_INSTANCES,
NAMED_PIPE_BUFFER_SIZE, # nOutBufferSize
NAMED_PIPE_BUFFER_SIZE, # nInBufferSize
self._time_out,
None, # TODO: Add lpSecurityAttributes here to limit the access
def request_handler(self, server: "NamedPipeServer", pipe_handle: HANDLE):
return WinBackgroundResourceRequestHandler(
cast("WinBackgroundNamedPipeServer", server), pipe_handle
)
if pipe_handle == win32file.INVALID_HANDLE_VALUE:
return None
return pipe_handle

def serve_forever(self) -> None:
"""
Runs the Named Pipe Server continuously until a shutdown signal is received.
This method listens to the NamedPipe Server and creates new instances of named pipes
and corresponding threads for handling client-server communication.
"""
_logger.info(f"Creating Named Pipe with name: {self._pipe_name}")
# During shutdown, _shutdown_event will be set
while not self._shutdown_event.is_set():
pipe_handle = self._create_pipe(self._pipe_name)
if pipe_handle is None:
error_msg = (
f"Failed to create named pipe instance: "
f"{win32api.FormatMessage(win32api.GetLastError())}"
)
_logger.error(error_msg)
raise RuntimeError(error_msg)
self._named_pipe_instances.append(pipe_handle)
_logger.debug("Waiting for connection from the client...")

try:
win32pipe.ConnectNamedPipe(pipe_handle, None)
except pywintypes.error as e:
if e.winerror == winerror.ERROR_PIPE_NOT_CONNECTED:
_logger.info(
"NamedPipe Server is shutdown. Exit the main thread in the backend server."
)
break
else:
_logger.error(f"Error encountered while connecting to NamedPipe: {e} ")
request_handler = WinBackgroundResourceRequestHandler(self, pipe_handle)
Thread(target=request_handler.instance_thread).start()

def shutdown(self) -> None:
"""
Shuts down the Named Pipe server and closes all named pipe handlers.
Signals the `serve_forever` method to stop listening to the NamedPipe Server by
setting the _shutdown_event.
"""
self._shutdown_event.set()
# TODO: Need to find out a better way to wait for the communication finish
# After sending the shutdown command, we need to wait for the response
# from it before shutting down server or the client won't get the response.
time.sleep(1)
error_list: List[Exception] = []
while self._named_pipe_instances:
pipe_handle = self._named_pipe_instances.pop()
try:
win32pipe.DisconnectNamedPipe(pipe_handle)
win32file.CloseHandle(pipe_handle)
except pywintypes.error as e:
# If the communication is finished then handler may be closed
if e.args[0] == winerror.ERROR_INVALID_HANDLE:
pass
except Exception as e:
import traceback

_logger.error(
f"Encountered the following error "
f"while shutting down the WinBackgroundNamedPipeServer: {str(traceback.format_exc())}"
)
# Store any errors to raise after closing all pipe handles,
# allowing handling of multiple errors during shutdown.
error_list.append(e)
if error_list:
raise MultipleErrors(error_list)
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

import json
from typing import TYPE_CHECKING, cast

from .._named_pipe import ResourceRequestHandler

if TYPE_CHECKING: # pragma: no cover because pytest will think we should test for this.
from .backend_named_pipe_server import WinBackgroundNamedPipeServer

from openjd.adaptor_runtime._background.server_response import ServerResponseGenerator

from pywintypes import HANDLE
import logging

_logger = logging.getLogger(__name__)


class WinBackgroundResourceRequestHandler(ResourceRequestHandler):
"""
A handler for managing requests sent to a NamedPipe instance within a Windows environment.
This class handles incoming requests, processes them, and sends back appropriate responses.
It is designed to work in conjunction with a WinBackgroundNamedPipeServer that manages the
lifecycle of the NamedPipe server and other associated resources.
"""

def __init__(self, server: "WinBackgroundNamedPipeServer", pipe_handle: HANDLE):
"""
Initializes the WinBackgroundResourceRequestHandler with a server and pipe handle.
Args:
server(WinBackgroundNamedPipeServer): The server instance that created this handler.
It is responsible for managing the lifecycle of the NamedPipe server and other resources.
pipe_handle(pipe_handle): The handle to the NamedPipe instance created and managed by the server.
pipe_handle(HANDLE): pipe_handle(HANDLE): Handle for the NamedPipe, established by the instance.
Utilized for message read/write operations.
"""
super().__init__(server, pipe_handle)

def handle_request(self, data: str):
"""
Processes an incoming request and routes it to the correct response handler based on the method
and request path.
Args:
data: A string containing the message sent from the client.
"""
request_dict = json.loads(data)
path = request_dict["path"]
body = json.loads(request_dict["body"])
method = request_dict["method"]

if "params" in request_dict and request_dict["params"] != "null":
query_string_params = json.loads(request_dict["params"])
else:
query_string_params = {}

server_operation = ServerResponseGenerator(
cast("WinBackgroundNamedPipeServer", self.server),
self.send_response,
body,
query_string_params,
)
try:
# TODO: Code refactoring to get rid of the `if...elif..` by using getattr
if path == "/run" and method == "PUT":
server_operation.generate_run_put_response()

elif path == "/shutdown" and method == "PUT":
server_operation.generate_shutdown_put_response()

elif path == "/heartbeat" and method == "GET":
_ACK_ID_KEY = ServerResponseGenerator.ACK_ID_KEY

def _parse_ack_id():
if _ACK_ID_KEY in query_string_params:
return query_string_params[_ACK_ID_KEY]

server_operation.generate_heartbeat_get_response(_parse_ack_id)

elif path == "/start" and method == "PUT":
server_operation.generate_start_put_response()

elif path == "/stop" and method == "PUT":
server_operation.generate_stop_put_response()

elif path == "/cancel" and method == "PUT":
server_operation.generate_cancel_put_response()
except Exception as e:
_logger.error(
f"Error encountered in request handling. "
f"Path: '{path}', Method: '{method}', Error: '{str(e)}'"
)
raise
6 changes: 6 additions & 0 deletions src/openjd/adaptor_runtime/_named_pipe/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

from .named_pipe_server import NamedPipeServer
from .named_pipe_request_handler import ResourceRequestHandler

__all__ = ["NamedPipeServer", "ResourceRequestHandler"]
Loading

0 comments on commit 70580a2

Please sign in to comment.