Skip to content

Commit

Permalink
refactor: Replace cancel queue with a shutdown event.
Browse files Browse the repository at this point in the history
Signed-off-by: Hongli Chen <[email protected]>
  • Loading branch information
Honglichenn committed Nov 28, 2023
1 parent ac80ce7 commit 86a6f28
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
from __future__ import annotations
import logging

import threading
from threading import Event, Thread
import time

from queue import Queue
from typing import List, Optional

from .named_pipe_request_handler import WinBackgroundResourceRequestHandler
Expand Down Expand Up @@ -72,15 +71,15 @@ def __init__(
self,
pipe_name: str,
adaptor_runner: AdaptorRunner,
cancel_queue: Queue,
shutdown_event: Event,
*,
log_buffer: LogBuffer | None = None,
): # pragma: no cover
"""
Args:
pipe_name (str): Name of the pipe for the NamedPipe Server.
adaptor_runner (AdaptorRunner): Adaptor runner instance for operation execution.
cancel_queue (Queue): Queue used for signaling server shutdown.
shutdown_event (Event): An Event used for signaling server shutdown.
log_buffer (LogBuffer|None, optional): Buffer for logging activities, defaults to None.
"""
if not OSName.is_windows():
Expand All @@ -89,7 +88,7 @@ def __init__(
f"Current Operating System is {OSName._get_os_name()}"
)
self._adaptor_runner = adaptor_runner
self._cancel_queue = cancel_queue
self._shutdown_event = shutdown_event
self._future_runner = AsyncFutureRunner()
self._log_buffer = log_buffer
self._named_pipe_instances: List[HANDLE] = []
Expand Down Expand Up @@ -134,9 +133,8 @@ def serve_forever(self) -> None:
and corresponding threads for handling client-server communication.
"""
_logger.info(f"Creating Named Pipe with name: {self._pipe_name}")
# During shutdown, a `True` will be pushed to the `_cancel_queue` for ending this loop
# TODO: Using threading.event instead of a queue to signal and termination
while self._cancel_queue.qsize() == 0:
# During shutdown, _shutdown_event will be set
while not self._shutdown_event.is_set():
pipe_handle = self._create_pipe(self._pipe_name)
if pipe_handle is None:
error_msg = (
Expand All @@ -159,16 +157,16 @@ def serve_forever(self) -> None:
else:
_logger.error(f"Error encountered while connecting to NamedPipe: {e} ")
request_handler = WinBackgroundResourceRequestHandler(self, pipe_handle)
threading.Thread(target=request_handler.instance_thread).start()
Thread(target=request_handler.instance_thread).start()

def shutdown(self) -> None:
"""
Shuts down the Named Pipe server and closes all named pipe handlers.
Signals the `serve_forever` method to stop listening to the NamedPipe Server by
pushing a `True` value into the `_cancel_queue`.
setting the _shutdown_event.
"""
self._cancel_queue.put(True)
self._shutdown_event.set()
# TODO: Need to find out a better way to wait for the communication finish
# After sending the shutdown command, we need to wait for the response
# from it before shutting down server or the client won't get the response.
Expand Down
15 changes: 7 additions & 8 deletions src/openjd/adaptor_runtime/_background/backend_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import logging
import os
import signal
from queue import Queue
from threading import Thread
from threading import Thread, Event
from types import FrameType
from typing import Optional, Union

Expand Down Expand Up @@ -71,7 +70,7 @@ def run(self) -> None:
that port to a connection file, and listens for HTTP requests until a shutdown is requested
"""
_logger.info("Running in background daemon mode.")
queue: Queue = Queue()
shutdown_event: Event = Event()

if OSName.is_posix():
server_path = SocketDirectories.for_os().get_process_socket_path(
Expand All @@ -87,14 +86,14 @@ def run(self) -> None:
self._server = WinBackgroundNamedPipeServer(
server_path,
self._adaptor_runner,
cancel_queue=queue,
shutdown_event=shutdown_event,
log_buffer=self._log_buffer,
)
else:
self._server = BackgroundHTTPServer(
server_path,
self._adaptor_runner,
cancel_queue=queue,
shutdown_event=shutdown_event,
log_buffer=self._log_buffer,
)
_logger.debug(f"Listening on {server_path}")
Expand All @@ -118,11 +117,11 @@ def run(self) -> None:
except OSError as e:
_logger.error(f"Error writing to connection file: {e}")
_logger.info("Shutting down server...")
queue.put(True)
shutdown_event.set()
raise
finally:
# Block until the cancel queue has been pushed to
queue.get()
# Block until the shutdown_event is set
shutdown_event.wait()

# Shutdown the server
self._server.shutdown() # type: ignore
Expand Down
6 changes: 3 additions & 3 deletions src/openjd/adaptor_runtime/_background/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
import socketserver
from http import HTTPStatus
from queue import Queue
from threading import Event
from typing import Callable
from .._osname import OSName
from .server_response import ServerResponseGenerator, AsyncFutureRunner
Expand Down Expand Up @@ -36,14 +36,14 @@ def __init__(
self,
socket_path: str,
adaptor_runner: AdaptorRunner,
cancel_queue: Queue,
shutdown_event: Event,
*,
log_buffer: LogBuffer | None = None,
bind_and_activate: bool = True,
) -> None: # pragma: no cover
super().__init__(socket_path, BackgroundRequestHandler, bind_and_activate) # type: ignore
self._adaptor_runner = adaptor_runner
self._cancel_queue = cancel_queue
self._shutdown_event = shutdown_event
self._future_runner = AsyncFutureRunner()
self._log_buffer = log_buffer

Expand Down
2 changes: 1 addition & 1 deletion src/openjd/adaptor_runtime/_background/server_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def generate_shutdown_put_response(self) -> HTTPResponse:
Linux: return HTTPResponse.
"""

self.server._cancel_queue.put(True)
self.server._shutdown_event.set()
return self.response_method(HTTPStatus.OK)

def generate_run_put_response(self) -> HTTPResponse:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import socketserver
from http import HTTPStatus
from queue import Queue
from threading import Event
from unittest.mock import MagicMock, PropertyMock, patch

import pytest
Expand Down Expand Up @@ -554,8 +554,8 @@ def test_signals_to_the_server_thread(self):
# GIVEN
mock_request_handler = MagicMock()
mock_server = MagicMock(spec=BackgroundHTTPServer)
mock_cancel_queue = MagicMock(spec=Queue)
mock_server._cancel_queue = mock_cancel_queue
mock_shutdown_event = MagicMock(spec=Event)
mock_server._shutdown_event = mock_shutdown_event
mock_request_handler.server = mock_server
mock_request_handler.headers = {"Content-Length": 0}
mock_request_handler.path = ""
Expand All @@ -565,7 +565,7 @@ def test_signals_to_the_server_thread(self):
response = handler.put()

# THEN
mock_cancel_queue.put.assert_called_once_with(True)
mock_shutdown_event.set.assert_called_once()
assert response.status == HTTPStatus.OK
assert response.body is None

Expand Down

0 comments on commit 86a6f28

Please sign in to comment.