Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add/websocket #143

Merged
merged 44 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
0a06c51
add url validation check in RESTClientCommunicator constructor
AHReccese Oct 17, 2024
f12118a
minor refactoring in RESTServerCommunicator constructor
AHReccese Oct 17, 2024
cf28dcb
WebSocketClientCommunicator implemented
AHReccese Oct 18, 2024
d493b63
WebSocketServerCommunicator implemented
AHReccese Oct 18, 2024
15ca393
refactor imports and update
AHReccese Oct 18, 2024
dde0129
ClientCommunicator Enum defined
AHReccese Oct 18, 2024
31e1dbf
ServerCommunicator Enum defined
AHReccese Oct 18, 2024
8906811
`invalid url` and `websocket not connect` error messages added
AHReccese Oct 18, 2024
2698087
url validation utility functions added
AHReccese Oct 18, 2024
11b6b6f
scenario1 generalized to support various communication medium protocols
AHReccese Oct 18, 2024
670157c
scenario2 generalized to support various communication medium protocols
AHReccese Oct 18, 2024
ef9eb83
scenario3 generalized to support various communication medium protocols
AHReccese Oct 18, 2024
2b96c9b
`run_server` generalized to support various communication medium prot…
AHReccese Oct 18, 2024
3e92766
fixtures updated to support various communication medium protocols + …
AHReccese Oct 18, 2024
e8aac70
PyMiloClient updated to support Communication Medium Protocol selecti…
AHReccese Oct 18, 2024
10013c7
`PyMiloServer` updated to support Communication Medium Protocol selec…
AHReccese Oct 18, 2024
2aa1540
`websockets` added to streaming requirements
AHReccese Oct 18, 2024
e93ab57
`websockets` added to dev requirements
AHReccese Oct 18, 2024
f2f3483
remove secondary event loop creation
AHReccese Oct 18, 2024
9d4d7cf
lightweighting ml streaming testcases
AHReccese Oct 18, 2024
333b39d
add public module docstring
AHReccese Oct 18, 2024
e64604f
`autopep8.sh` applied
AHReccese Oct 18, 2024
57c239b
remove un-used imports + increase sleep time to lower connection refu…
AHReccese Oct 18, 2024
b478309
`CHANGELOG.md` updated
AHReccese Oct 18, 2024
8b3eaf0
create even loop if it doesn't exist
AHReccese Oct 18, 2024
86b1b4d
lowercasing the letters for the starting of docstring :param , :type …
AHReccese Oct 19, 2024
5682095
`CHANGELOG.md` updated
AHReccese Oct 19, 2024
95b9ae4
`CHANGELOG.md` updated
AHReccese Oct 22, 2024
5114d9e
fix `ClientCommunicator` sudden override by the `ClientCommunicationP…
AHReccese Oct 22, 2024
3ed9088
combine protocol enums
AHReccese Oct 22, 2024
e9fb8f5
I added `CommunicationProtocol` to the `__init__` file of the `stream…
AHReccese Oct 22, 2024
5e04b0c
Merge branch 'dev' into add/websocket
AHReccese Oct 22, 2024
3eb0b7a
`CHANGELOG.md` updated
AHReccese Nov 4, 2024
2c349d3
Merge branch 'add/websocket' of https://github.com/openscilab/pymilo …
AHReccese Nov 4, 2024
1a40101
`CHANGELOG.md` updated
AHReccese Nov 14, 2024
c0b4167
fixing versions
AHReccese Nov 14, 2024
e43570c
Merge branch 'dev' into add/websocket
AHReccese Nov 14, 2024
333ed09
remove scipy, pydantic
AHReccese Nov 17, 2024
042d0e0
convert websocket req to >=
AHReccese Nov 17, 2024
edb1a00
refactor check for socket close and add support for different `websoc…
AHReccese Nov 17, 2024
d5f70e2
update on `requirements.txt`
AHReccese Nov 17, 2024
d092d51
update on `requirements.txt`
AHReccese Nov 17, 2024
d2fea8e
`CHANGELOG.md` updated
AHReccese Nov 17, 2024
4d749e5
remove trailing whitespaces
AHReccese Nov 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,18 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]
### Added
- `validate_http_url` function in `streaming.util.py`
- `validate_websocket_url` function in `streaming.util.py`
- `ML Streaming` WebSocket testcases
- `CommunicationProtocol` Enum in `streaming.communicator.py`
- `WebSocketClientCommunicator` class in `streaming.communicator.py`
- `WebSocketServerCommunicator` class in `streaming.communicator.py`
- PyMilo exception types added in `pymilo/exceptions/__init__.py`
- PyMilo exception types added in `pymilo/__init__.py`
### Changed
- `client_communicator` parameter added to `PyMiloClient` class
sepandhaghighi marked this conversation as resolved.
Show resolved Hide resolved
- `server_communicator` parameter added to `PyMiloServer` class
sepandhaghighi marked this conversation as resolved.
Show resolved Hide resolved
- ML Streaming testcases updated to support protocol selection
sepandhaghighi marked this conversation as resolved.
Show resolved Hide resolved
- GitHub actions are limited to the `dev` and `main` branches
- `Python 3.13` added to `test.yml`
## [1.0] - 2024-09-16
Expand Down
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ uvicorn==0.32.0
fastapi==0.115.2
requests==2.32.3
pydantic>=1.5.0
websockets>=9.0
sepandhaghighi marked this conversation as resolved.
Show resolved Hide resolved
setuptools>=40.8.0
vulture>=1.0
bandit>=1.5.1
Expand Down
1 change: 1 addition & 0 deletions pymilo/streaming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from .pymilo_client import PymiloClient
from .pymilo_server import PymiloServer
from .compressor import Compression
from .communicator import CommunicationProtocol
302 changes: 298 additions & 4 deletions pymilo/streaming/communicator.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
# -*- coding: utf-8 -*-
"""PyMilo RESTFull Communication Mediums."""
"""PyMilo Communication Mediums."""
import json
import asyncio
import uvicorn
import requests
import websockets
from enum import Enum
from pydantic import BaseModel
from fastapi import FastAPI, Request
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from .interfaces import ClientCommunicator
from .param import PYMILO_INVALID_URL, PYMILO_CLIENT_WEBSOCKET_NOT_CONNECTED
from .util import validate_websocket_url, validate_http_url


class RESTClientCommunicator(ClientCommunicator):
Expand All @@ -19,6 +24,9 @@ def __init__(self, server_url):
:type server_url: str
:return: an instance of the Pymilo RESTClientCommunicator class
"""
is_valid, server_url = validate_http_url(server_url)
if not is_valid:
raise Exception(PYMILO_INVALID_URL)
self._server_url = server_url
self.session = requests.Session()
retries = requests.adapters.Retry(
Expand Down Expand Up @@ -96,10 +104,10 @@ def __init__(
:type port: int
:return: an instance of the Pymilo RESTServerCommunicator class
"""
self.app = FastAPI()
self._ps = ps
self.host = host
self.port = port
self._ps = ps
self.app = FastAPI()
self.setup_routes()

def setup_routes(self):
Expand Down Expand Up @@ -188,3 +196,289 @@ def parse(self, body):
def run(self):
"""Run internal fastapi server."""
uvicorn.run(self.app, host=self.host, port=self.port)


class WebSocketClientCommunicator(ClientCommunicator):
"""Facilitate working with the communication medium from the client side for the WebSocket protocol."""

def __init__(
self,
server_url: str = "ws://127.0.0.1:8000"
):
"""
Initialize the WebSocketClientCommunicator instance.

:param server_url: the WebSocket server URL to connect to.
:type server_url: str
:return: an instance of the Pymilo WebSocketClientCommunicator class
"""
is_valid, url = validate_websocket_url(server_url)
if not is_valid:
raise Exception(PYMILO_INVALID_URL)
self.server_url = url
self.websocket = None
self.connection_established = asyncio.Event() # Event to signal connection status
# check for even loop existance
if asyncio._get_running_loop() is None:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
else:
self.loop = asyncio.get_event_loop()
self.loop.run_until_complete(self.connect())

async def connect(self):
"""Establish a WebSocket connection with the server."""
if self.websocket is None or self.websocket.closed:
self.websocket = await websockets.connect(self.server_url)
print("Connected to the WebSocket server.")
self.connection_established.set()

async def disconnect(self):
"""Close the WebSocket connection."""
if self.websocket:
await self.websocket.close()

async def send_message(self, action: str, payload: dict) -> dict:
"""
Send a message to the WebSocket server.

:param action: the type of action to perform (e.g., 'download', 'upload').
:type action: str
:param payload: the payload associated with the action.
:type payload: dict
:return: the server's response as a JSON object.
"""
await self.connection_established.wait()

if self.websocket is None or self.websocket.closed:
raise RuntimeError(PYMILO_CLIENT_WEBSOCKET_NOT_CONNECTED)

message = json.dumps({"action": action, "payload": payload})
await self.websocket.send(message)
response = await self.websocket.recv()
return json.loads(response)

def download(self, payload: dict) -> dict:
"""
Request the remote ML model to download.

:param payload: the payload for the download request.
:type payload: dict
:return: the downloaded model data.
"""
response = self.loop.run_until_complete(
self.send_message("download", payload)
)
return response.get("payload")

def upload(self, payload: dict) -> bool:
"""
Upload the local ML model to the remote server.

:param payload: the payload for the upload request.
:type payload: dict
:return: true if the upload request is acknowledged.
"""
response = self.loop.run_until_complete(
self.send_message("upload", payload)
)
return response.get("message") == "Upload request received."

def attribute_call(self, payload: dict) -> dict:
"""
Delegate the requested attribute call to the remote server.

:param payload: the payload containing attribute call details.
:type payload: dict
:return: the server's response to the attribute call.
"""
response = self.loop.run_until_complete(
self.send_message("attribute_call", payload)
)
return response

def attribute_type(self, payload: dict) -> dict:
"""
Identify the attribute type of the requested attribute.

:param payload: the payload containing attribute type request.
:type payload: dict
:return: the server's response with the attribute type.
"""
response = self.loop.run_until_complete(
self.send_message("attribute_type", payload)
)
return response


class WebSocketServerCommunicator:
"""Facilitate working with the communication medium from the server side for the WebSocket protocol."""

def __init__(
self,
ps,
host: str = "127.0.0.1",
port: int = 8000,
):
"""
Initialize the WebSocketServerCommunicator instance.

:param ps: reference to the PyMilo server.
:type ps: pymilo.streaming.PymiloServer
:param host: the WebSocket server host address.
:type host: str
:param port: the WebSocket server port.
:type port: int
:return: an instance of the WebSocketServerCommunicator class.
"""
self._ps = ps
self.host = host
self.port = port
self.app = FastAPI()
self.active_connections: list[WebSocket] = []
self.setup_routes()

def setup_routes(self):
"""Configure the WebSocket endpoint to handle client connections."""
@self.app.websocket("/")
async def websocket_endpoint(websocket: WebSocket):
await self.connect(websocket)
try:
while True:
message = await websocket.receive_text()
await self.handle_message(websocket, message)
except WebSocketDisconnect:
self.disconnect(websocket)

async def connect(self, websocket: WebSocket):
"""
Accept a WebSocket connection and store it.

:param websocket: the WebSocket connection to accept.
:type websocket: webSocket
"""
await websocket.accept()
self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
"""
Handle WebSocket disconnection.

:param websocket: the WebSocket connection to remove.
:type websocket: webSocket
"""
self.active_connections.remove(websocket)

async def handle_message(self, websocket: WebSocket, message: str):
"""
Handle messages received from WebSocket clients.

:param websocket: the WebSocket connection from which the message was received.
:type websocket: webSocket
:param message: the message received from the client.
:type message: str
"""
try:
message = json.loads(message)
action = message['action']
print(f"Server received action: {action}")
payload = self.parse(message['payload'])

if action == "download":
response = self._handle_download()
elif action == "upload":
response = self._handle_upload(payload)
elif action == "attribute_call":
response = self._handle_attribute_call(payload)
elif action == "attribute_type":
response = self._handle_attribute_type(payload)
else:
response = {"error": f"Unknown action: {action}"}

await websocket.send_text(json.dumps(response))
except Exception as e:
await websocket.send_text(json.dumps({"error": str(e)}))

def _handle_download(self) -> dict:
"""
Handle download requests.

:return: a response containing the exported model.
"""
return {
"message": "Download request received.",
"payload": self._ps.export_model(),
}

def _handle_upload(self, payload: dict) -> dict:
"""
Handle upload requests.

:param payload: the payload containing the model data to upload.
:type payload: dict
:return: a response indicating that the upload was processed.
"""
return {
"message": "Upload request received.",
"payload": self._ps.update_model(payload["model"]),
}

def _handle_attribute_call(self, payload: dict) -> dict:
"""
Handle attribute call requests.

:param payload: the payload containing the attribute call details.
:type payload: dict
:return: a response with the result of the attribute call.
"""
result = self._ps.execute_model(payload)
return {
"message": "Attribute call executed.",
"payload": result if result else "The ML model has been updated in place.",
}

def _handle_attribute_type(self, payload: dict) -> dict:
"""
Handle attribute type queries.

:param payload: the payload containing the attribute to query.
:type payload: dict
:return: a response with the attribute type and value.
"""
is_callable, field_value = self._ps.is_callable_attribute(payload)
return {
"message": "Attribute type query executed.",
"attribute type": "method" if is_callable else "field",
"attribute value": "" if is_callable else field_value,
}

def parse(self, message: str) -> dict:
"""
Parse the encrypted and compressed message.

:param message: the encrypted and compressed message to parse.
:type message: str
:return: the decrypted and extracted version of the message.
"""
return json.loads(
self._ps._compressor.extract(
self._ps._encryptor.decrypt(message)
)
)

def run(self):
"""Run the internal FastAPI server."""
uvicorn.run(self.app, host=self.host, port=self.port)


class CommunicationProtocol(Enum):
"""Communication protocol."""

REST = {
"CLIENT": RESTClientCommunicator,
"SERVER": RESTServerCommunicator,
}
WEBSOCKET = {
"CLIENT": WebSocketClientCommunicator,
"SERVER": WebSocketServerCommunicator,
}
2 changes: 2 additions & 0 deletions pymilo/streaming/param.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@
PYMILO_CLIENT_FAILED_TO_DOWNLOAD_REMOTE_MODEL = "PyMiloClient failed to download the remote ML model."

PYMILO_SERVER_NON_EXISTENT_ATTRIBUTE = "The requested attribute doesn't exist in this model."
PYMILO_INVALID_URL = "The given URL is not valid."
PYMILO_CLIENT_WEBSOCKET_NOT_CONNECTED = "WebSocket is not connected."
Loading
Loading