Skip to content

Commit

Permalink
Implements #7
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinsbarnard committed Oct 12, 2023
1 parent d27999f commit c4ff75f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
9 changes: 8 additions & 1 deletion lcm_websocket_server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import asyncio
import queue
from signal import SIGINT, SIGTERM
from urllib.parse import unquote

from websockets import serve

Expand Down Expand Up @@ -44,8 +45,14 @@ async def republish_lcm(websocket, path):
ip, port = websocket.remote_address[:2]
logger.info(f"Client connected from {ip}:{port} at {path}")

# Extract channel regex
channel_regex = unquote(path.lstrip('/'))
if not channel_regex: # empty path -> subscribe to all channels
channel_regex = '.*'

# Create an LCM observer for this client
observer = LCMObserver()
observer = LCMObserver(channel_regex=channel_regex)
logger.debug(f"Created LCM observer with channel regex {channel_regex}")

# Subscribe the observer to the LCM republisher
lcm_republisher.subscribe(observer)
Expand Down
24 changes: 21 additions & 3 deletions lcm_websocket_server/lcmpubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import lcm

import queue
import re
from threading import Thread

from lcm_websocket_server.log import get_logger
Expand All @@ -15,12 +16,28 @@ class LCMObserver:
"""
Observer for an LCMObservable. Puts received events in a thread-safe queue.
"""
def __init__(self):
def __init__(self, channel_regex: str = ".*"):
self._queue = queue.Queue()
self._channel_regex = channel_regex

def match(self, channel: str) -> bool:
"""
Check if the observer matches a given channel.
Args:
channel: Channel name
Returns:
True if the observer matches the channel, False otherwise.
"""
try:
return re.fullmatch(self._channel_regex, channel) is not None
except:
return False

def handle(self, event):
"""
Handles an LCM event.
Handle an LCM event.
"""
self._queue.put(event)

Expand Down Expand Up @@ -91,4 +108,5 @@ def _handler(self, channel, data):
data: The LCM data
"""
for subscriber in self._subscribers:
subscriber.handle((channel, data))
if subscriber.match(channel):
subscriber.handle((channel, data))

0 comments on commit c4ff75f

Please sign in to comment.