From 61022b5dce7d2692215b9f122aacc772cc2c8751 Mon Sep 17 00:00:00 2001 From: Peter Harris Date: Tue, 17 Dec 2024 15:30:28 +0000 Subject: [PATCH 1/3] Clean up server code --- lglpy/server.py | 316 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 224 insertions(+), 92 deletions(-) diff --git a/lglpy/server.py b/lglpy/server.py index 127e708..15dbd43 100644 --- a/lglpy/server.py +++ b/lglpy/server.py @@ -21,24 +21,31 @@ # SOFTWARE. # ----------------------------------------------------------------------------- -# This module implements the server-side communications module that can -# accept client connections from the layer drivers, and dispatch them to -# handlers in the server. +# This module implements the server-side communications module that can accept +# client connections from a layer driver, and dispatch messages to registered +# service handler in the server. # -# This module currently only accepts a single connection from a layer at a time -# and runs in the context of the calling thread, so if it needs to run in the -# background the user must create a new thread to contain it. It is therefore -# not possible to implement pseudo-host-driven event loops if the layer is -# using multiple services concurrently - this needs threads per service. +# This module currently only accepts a single connection at a time and message +# handling is synchronous inside the server. It is therefore not possible to +# implement pseudo-host-driven event loops if the layer is using multiple +# services concurrently - this needs threads per service. import enum import socket import struct +from typing import Any, Optional class MessageType(enum.Enum): ''' The received message type. + + NOTE: Values defined by the protocol; do not change. + + Attributes: + TX_ASYNC: Message is an async client transmit, no response allowed. + TX: Message is a sync client transmit, no response allowed. + TX_RX: Message is a sync client transmit, response required. ''' TX_ASYNC = 0 TX = 1 @@ -47,63 +54,156 @@ class MessageType(enum.Enum): class Message: ''' - A decoded message header packet. + A decoded message header for a received message. - See the MessageHeader struct in comms_message.hpp for binary layout. + NOTE: Fields and sizes defined by the protocol; do not change. + + Attributes: + message_type: The type of the message sent by the client. + endpoint_id: The endpoint service address. + message_id: The message cookie to use in a TX_RX response. + payload_size: The size of the payload in bytes. + payload: The data payload. ''' - def __init__(self, header): - assert len(header) == 14, 'Header length is incorrect' + HEADER_LEN = 14 + + def __init__(self, header: bytes): + ''' + Populate a new message based on the header info. + + Args: + header: The header byte stream. + ''' + assert len(header) == Message.HEADER_LEN, 'Header length is incorrect' fields = struct.unpack(' None: + ''' + Attach a payload to this message. + + Args: + Data: The payload byte stream. + ''' self.payload = data + class Response: ''' - An encoded message header packet. + An encoded message header for a message to be transmitted. - See the MessageHeader struct in comms_message.hpp for binary layout. + NOTE: Fields and sizes defined by the protocol; do not change. + + Attributes: + message_type: The type of the message sent by the client. + message_id: The message cookie to use in a TX_RX response. + payload_size: The size of the payload in bytes. ''' - def __init__(self, message, data): + def __init__(self, message: Message, data: bytes): + ''' + Populate a message header for a response. + Args: + message: The message we are responding to. + data: The response payload byte stream. + ''' self.message_type = message.message_type self.message_id = message.message_id self.payload_size = len(data) - def get_header(self): - data = struct.pack(' bytes: + ''' + Get the header byte stream for this response. + + Returns: + The response header byte stream. + ''' + return struct.pack(' str: - return 'registry' + self.sockl = None # type: Optional[socket.socket] + self.sockd = None # type: Optional[socket.socket] def register_endpoint(self, endpoint) -> int: + ''' + Register a new service endpoint with the server. + + Args: + endpoint: The endpoint object that can handle messages. + + Returns: + The assigned endpoint address. + ''' endpoint_id = len(self.endpoints) self.endpoints[endpoint_id] = endpoint return endpoint_id - def handle_message(self, message: Message): + def get_service_name(self) -> str: + ''' + Get the name of the self-hosted registry microservice. + + Returns: + The name of the service endpoint. + ''' + return 'registry' + + def handle_message(self, message: Message) -> Optional[bytes]: + ''' + Handle a message in the self-hosted registry microservice. + + Returns: + The response to the message. + ''' data = [] for endpoint_id, endpoint in self.endpoints.items(): name = endpoint.get_service_name().encode('utf-8') @@ -112,87 +212,119 @@ def handle_message(self, message: Message): return b''.join(data) - def run(self): - listen_sockfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - listen_sockfd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - listen_sockfd.bind(('localhost', self.port)) - listen_sockfd.listen(1) + def run(self) -> None: + ''' + Enter server connection handler run loop. + ''' + self.sockl = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sockl.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.listen_sockfd = listen_sockfd + # Set up the listening socket + self.sockl.bind(('localhost', self.port)) + self.sockl.listen(1) - # Accept connections from outside + # Accept a new client connection and assign a data socket while not self.shutdown: print('Waiting for client connection') try: - sockfd, _ = listen_sockfd.accept() + self.sockd, _ = self.sockl.accept() + print(' + Client connected') + + self.run_client() + + print(' + Client disconnected') + self.sockd.close() + self.sockd = None + + except ClientDropped: + print(' + Client disconnected') + if self.sockd: + self.sockd.close() + self.sockd = None + except OSError: continue - self.data_sockfd = sockfd - print(' + Client connected') - - while not self.shutdown: - # Read the header - data = self.receive_data(sockfd, 14) - if not data: - break - message = Message(data) - - if message.payload_size: - # Read the payload - data = self.receive_data(sockfd, message.payload_size) - if not data: - break - message.add_payload(data) - - # Dispatch to a handler - endpoint = self.endpoints[message.endpoint_id] - response = endpoint.handle_message(message) - - # Send a response for all TX_RX messages - if message.message_type == MessageType.TX_RX: - header = Response(message, response) - sent = self.send_data(sockfd, header.get_header()) - if not sent: - break - sent = self.send_data(sockfd, response) - if not sent: - break - - sockfd.close() - self.data_sockfd = None - - listen_sockfd.close() - self.listen_sockfd = None - - def stop(self): + self.sockl.close() + self.sockl = None + + def run_client(self) -> None: + ''' + Enter client message handler run loop. + + Raises: + ClientDropped: The client disconnected from the socket. + ''' + while not self.shutdown: + # Read the header + data = self.receive_data(Message.HEADER_LEN) + message = Message(data) + + # Read the payload if there is one + if message.payload_size: + data = self.receive_data(message.payload_size) + message.add_payload(data) + + # Dispatch to a service handler + endpoint = self.endpoints[message.endpoint_id] + response = endpoint.handle_message(message) + + # Send a response for all TX_RX messages + if message.message_type == MessageType.TX_RX: + header = Response(message, response) + self.send_data(header.get_header()) + self.send_data(response) + + def stop(self) -> None: + ''' + Shut down the server. + ''' self.shutdown = True - if self.listen_sockfd is not None: - self.listen_sockfd.close() + if self.sockl is not None: + self.sockl.close() - if self.data_sockfd is not None: - self.data_sockfd.shutdown(socket.SHUT_RDWR) + if self.sockd is not None: + self.sockd.shutdown(socket.SHUT_RDWR) - def receive_data(self, sockfd, byte_count): - data = b'' + def receive_data(self, size: int) -> bytes: + ''' + Fetch a fixed size packet from the socket. + + Args: + size: The length of the packet in bytes. + + Returns: + The packet data. - while len(data) < byte_count: - new_data = sockfd.recv(byte_count - len(data)) + Raises: + ClientDropped: The client disconnected from the socket. + ''' + assert self.sockd is not None + + data = b'' + while len(data) < size: + new_data = self.sockd.recv(size - len(data)) if not new_data: - print(" - Client disconnected") - return None + raise ClientDropped() data = data + new_data return data - def send_data(self, sockfd, data): + def send_data(self, data: bytes) -> None: + ''' + Send a fixed size packet to the socket. + + Args: + data: The binary data to send. + + Raises: + ClientDropped: The client disconnected from the socket. + ''' + assert self.sockd is not None + while len(data): - sent_bytes = sockfd.send(data) + sent_bytes = self.sockd.send(data) if not sent_bytes: - print(" - Client disconnected") - return False + raise ClientDropped() data = data[sent_bytes:] - - return True From 34043eaf1cfeb084fc54654e010062df2d7a97f3 Mon Sep 17 00:00:00 2001 From: Peter Harris Date: Wed, 18 Dec 2024 19:44:24 +0000 Subject: [PATCH 2/3] Cleanup service code --- lgl_host_server.py | 8 ++- lglpy/service_gpu_timeline.py | 130 ++++++++++++++++++++++++---------- lglpy/service_log.py | 31 +++++--- lglpy/service_test.py | 30 +++++--- 4 files changed, 141 insertions(+), 58 deletions(-) diff --git a/lgl_host_server.py b/lgl_host_server.py index 3893fec..e565aaf 100644 --- a/lgl_host_server.py +++ b/lgl_host_server.py @@ -42,9 +42,11 @@ def main(): # Register all the services with it print(f'Registering host services:') - service = lglpy.service_test.TestService() - endpoint_id = server.register_endpoint(service) - print(f' - [{endpoint_id}] = {service.get_service_name()}') + + if 0: + service = lglpy.service_test.TestService() + endpoint_id = server.register_endpoint(service) + print(f' - [{endpoint_id}] = {service.get_service_name()}') service = lglpy.service_log.LogService() endpoint_id = server.register_endpoint(service) diff --git a/lglpy/service_gpu_timeline.py b/lglpy/service_gpu_timeline.py index 9ead818..fa5e6b8 100644 --- a/lglpy/service_gpu_timeline.py +++ b/lglpy/service_gpu_timeline.py @@ -22,80 +22,134 @@ # ----------------------------------------------------------------------------- # This module implements the server-side communications module service that -# implements a basic message endpoint for testing. +# handles record preprocessing and serializing GPU Timeline layer messages to +# file on the host. -from lglpy.server import Message import json import struct +from typing import Any + +from lglpy.server import Message + class GPUTimelineService: def __init__(self): + ''' + Initialize the timeline service. + + Returns: + The endpoint name. + ''' + # Create a default frame record self.frame = { "frame": 0, "workloads": [] } # TODO: Make file name configurable - self.fileHandle = open('malivision.gputl', 'wb') + self.file_handle = open('malivision.gputl', 'wb') def get_service_name(self) -> str: + ''' + Get the service endpoint name. + + Returns: + The endpoint name. + ''' return 'GPUTimeline' - def handle_frame(self, msg): + def handle_frame(self, msg: Any) -> None: + ''' + Handle a frame boundary workload. + + This will write the current frame record to the output file, and then + reset the frame tracker ready for the next frame. + + Args: + msg: The Python decode of a JSON payload. + ''' # Write frame packet to the file - lastFrame = json.dumps(self.frame).encode('utf-8') - length = struct.pack(' None: + ''' + Handle a render pass workload. + + Render passes may generate multiple messages if suspended and resumed + when using Vulkan 1.3 dynamic render passes, so merge those into a + single workload. + + Args: + msg: The Python decode of a JSON payload. + ''' # Find the last workload - lastRenderPass = None + last_render_pass = None if len(self.frame['workloads']): - lastWorkload = self.frame['workloads'][-1] - if lastWorkload['type'] == 'renderpass': - lastRenderPass = lastWorkload - - # Continuation - if lastRenderPass and lastRenderPass['tid'] == msg['tid']: - # Don't accumulate if tagID is not unique metadata tag - if lastRenderPass['drawCallCount'] != -1: - lastRenderPass['drawCallCount'] += msg['drawCallCount'] - # New render pass + last_workload = self.frame['workloads'][-1] + if last_workload['type'] == 'renderpass': + last_render_pass = last_workload + + # If this is a continuation then merge records + if last_render_pass and (last_render_pass['tid'] == msg['tid']): + # Don't accumulate if tagID is flagged as ambiguous + if last_render_pass['drawCallCount'] != -1: + last_render_pass['drawCallCount'] += msg['drawCallCount'] + + # Otherwise this is a new record else: self.frame['workloads'].append(msg) - def handle_generic(self, msg): + def handle_generic(self, msg: Any) -> None: + ''' + Handle a generic workload that needs no special handling. + + Args: + msg: The Python decode of a JSON payload. + ''' self.frame['workloads'].append(msg) - def handle_message(self, message: Message): - payload = message.payload.decode('utf-8') - parsedPayload = json.loads(payload) + def handle_message(self, message: Message) -> None: + ''' + Handle a service request from a layer. + + Note that this service only expects pushed TX or TX_ASYNC messages, so + never provides a response. + ''' + encoded_payload = message.payload.decode('utf-8') + payload = json.loads(encoded_payload) + + generic_payload_types = { + 'dispatch', + 'tracerays', + 'imagetransfer', + 'buffertransfer' + } - payloadType = parsedPayload['type'] + payload_type = payload['type'] - if payloadType == 'frame': - self.handle_frame(parsedPayload) + if payload_type == 'frame': + self.handle_frame(payload) - elif payloadType == 'renderpass': - self.handle_renderpass(parsedPayload) + elif payload_type == 'renderpass': + self.handle_render_pass(payload) - elif payloadType in ('dispatch', 'tracerays', 'imagetransfer', 'buffertransfer'): - self.handle_generic(parsedPayload) + elif payload_type in generic_payload_types: + self.handle_generic(payload) else: - assert False, f'Unknown payload type {payloadType}' - - return None + assert False, f'Unknown payload type {payload_type}' diff --git a/lglpy/service_log.py b/lglpy/service_log.py index 72a50d8..aaf41cf 100644 --- a/lglpy/service_log.py +++ b/lglpy/service_log.py @@ -24,19 +24,32 @@ # This module implements the server-side communications module service that # implements basic logging. +from lglpy.server import Message + + class LogService: ''' - A decoded message header packet. - - See the MessageHeader struct in comms_message.hpp for binary layout. + A simple service used for remote logging to bypass logcat. ''' - def __init__(self): - pass + def get_service_name(self) -> str: + ''' + Get the service endpoint name. - def get_service_name(self): + Returns: + The endpoint name. + ''' return 'log' - def handle_message(self, message): - log_entry = payload.decode(encoding='utf-8') - print(log_entry) + def handle_message(self, message: Message) -> None: + ''' + Handle a service request from a layer. + + + Returns: + This service only expects pushed TX or TX_ASYNC messages, so never + provides a response. + ''' + # Print received payloads + payload = message.payload.decode('utf-8') + print(payload) diff --git a/lglpy/service_test.py b/lglpy/service_test.py index 24b28bc..ac44a6d 100644 --- a/lglpy/service_test.py +++ b/lglpy/service_test.py @@ -25,23 +25,37 @@ # implements a basic message endpoint for testing. from lglpy.server import Message, MessageType +from typing import Optional -class TestService: - def __init__(self): - pass +class TestService: + ''' + A simple service used for testing. + ''' def get_service_name(self) -> str: + ''' + Get the service endpoint name. + + Returns: + The endpoint name. + ''' return 'test' - def handle_message(self, message: Message): - payload = message.payload.decode('utf-8') + def handle_message(self, message: Message) -> Optional[bytes]: + ''' + Handle a service request from a layer. + Returns: + The response if message is a TX_RX message, None otherwise. + ''' + # Print received payloads + payload = message.payload.decode('utf-8') print(f'{message.message_type.name}: {payload} ({len(payload)} bytes)') + # Reverse payloads for response to TX_RX messages if message.message_type == MessageType.TX_RX: response = payload[::-1] - response = response.encode('utf-8') - return response + return response.encode('utf-8') - return '' + return None From 8ea3febadad19636e76a4b9be99319d6a7609b13 Mon Sep 17 00:00:00 2001 From: Peter Harris Date: Wed, 18 Dec 2024 19:44:40 +0000 Subject: [PATCH 3/3] Add Python style to Actions runner --- .github/workflows/build_test.yaml | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/.github/workflows/build_test.yaml b/.github/workflows/build_test.yaml index 60caba9..43b5370 100644 --- a/.github/workflows/build_test.yaml +++ b/.github/workflows/build_test.yaml @@ -13,6 +13,28 @@ on: - main jobs: + python-test: + name: Python tests + runs-on: ubuntu-22.04 + steps: + - name: Git checkout + uses: actions/checkout@v4 + with: + submodules: 'true' + + - name: Get Python modules + run: | + python3 -m pip install --upgrade pip + python3 -m pip install pycodestyle mypy + + - name: Check code style + run: | + python3 -m pycodestyle ./lglpy + + - name: Check typing + run: | + python3 -m mypy ./lglpy + build-ubuntu-x64-clang: name: Ubuntu x64 Clang runs-on: ubuntu-22.04