Skip to content


Merge pull request #3 from schrd/rework_queues
Browse files Browse the repository at this point in the history
Rework queues
  • Loading branch information
tibroc authored Apr 24, 2024
2 parents f5d7d73 + 31c744a commit d0841b4
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 105 deletions.
2 changes: 1 addition & 1 deletion pairing-server/
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,5 @@ pip install -r requirements-test.txt
2. Execute tests
python -m pytest
174 changes: 79 additions & 95 deletions pairing-server/
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import asyncio
import aioredis
from async_timeout import timeout
from quart import Quart, websocket, send_file, request
import json
import random
import traceback
from atomicx import AtomicInt

app = Quart(__name__)

redis_url = 'redis://'
pin_rollover = 600

to_plugin = {}
to_room = {}
pin_to_room = {}
room_config = {}
room_connections = AtomicInt()

def validate_client_config(config) -> bool:
return True
if type(config) != list:
Expand All @@ -20,27 +25,25 @@ def validate_client_config(config) -> bool:
return False
return True

async def generate_pin(redis):
def generate_pin():
while True:
pin = random.randrange(1e5, 1e6 - 1)
redis_key = f'bbb:pins:{pin}'
has_key = await redis.get(redis_key)
if has_key is None:
pin_free = pin not in pin_to_room
if pin_free:
return pin

async def generate_pin_task(channel, config):
print(f"pin task: started {channel}")
redis = await aioredis.from_url(redis_url)
pub = await aioredis.from_url(redis_url)
async def generate_pin_task(connection_id):
print(f"pin task: started {connection_id}")
q = to_room[connection_id]
while True:
pin = await generate_pin(redis)
print(f"pin task: generated pin {pin} for {channel}")
await pub.publish(channel, json.dumps({'pin': pin, 'timeout': pin_rollover}))
pin = generate_pin()
await q.put({'pin': pin, 'timeout': pin_rollover})
print(f"pin task: generated pin {pin} for {connection_id}")
await asyncio.sleep(pin_rollover)
await asyncio.sleep(0.0)
except asyncio.CancelledError:
print(f"pin task: cancelled for channel {channel}")
print(f"pin task: cancelled for channel {connection_id}")
except Exception as e:
print("exception 2")
Expand All @@ -49,105 +52,101 @@ async def generate_pin_task(channel, config):

async def handle_room() -> None:
redis = await aioredis.from_url(redis_url)
psub = None
pin_task = None
forward_task = None
connection_id = None
last_pin = None
data = await websocket.receive()
data = json.loads(data)
if not 'config' in data:
config = data['config']
client_id = await redis.client_id()
psub = redis.pubsub()
channel_name = f'bbb:client:{client_id}'
pin_task = asyncio.create_task(generate_pin_task(channel_name, config), name=f"pin_generate_{client_id}")
async with psub as p:
await p.subscribe(channel_name)
while True:
async for msg in p.listen():
print(f"room received: {msg}")
if msg['type'] != 'message':
if msg['data'] is not None:
msg = json.loads(msg['data'])
if 'pin' in msg:
pin = msg['pin']
await redis.set(f'bbb:pins:{pin}', json.dumps(config), ex=pin_rollover)
await redis.set(f'bbb:clients:{pin}', client_id, ex=pin_rollover)
await websocket.send(json.dumps({'action': 'new_pin', 'pin': pin, 'timeout': pin_rollover}))
elif 'start' in msg:
await websocket.send(json.dumps({'action': 'start', 'urls': msg['urls'], 'pairing_pin': msg['pairing_pin']}))
plugin_channel = f"bbb:client:{msg['client_id']}"
forward_task = asyncio.create_task(forward_from_websocket(plugin_channel, websocket))
elif 'stop_pin_generation' in msg:
elif 'start_pin_generation' in msg:
if pin_task is not None or pin_task.cancelled():
pin_task = asyncio.create_task(generate_pin_task(channel_name, config))
except asyncio.TimeoutError:
print("room timeout")
await p.unsubscribe(channel_name)
connection_id =
room_config[connection_id] = config
print(f'room config: {connection_id} is {config}')
to_room[connection_id] = asyncio.Queue()
pin_task = asyncio.create_task(generate_pin_task(connection_id), name=f"pin_generate_{connection_id}")
while True:
msg = await to_room[connection_id].get()
print(f"room queue received {msg}")
if 'pin' in msg:
pin = msg['pin']
if last_pin:
del pin_to_room[last_pin]
last_pin = pin
pin_to_room[pin] = connection_id
await websocket.send(json.dumps({'action': 'new_pin', 'pin': pin, 'timeout': pin_rollover}))
elif 'start' in msg:
await websocket.send(json.dumps({'action': 'start', 'urls': msg['urls'], 'pairing_pin': msg['pairing_pin']}))
forward_task = asyncio.create_task(forward_from_queue_to_websocket(to_room[connection_id], websocket))
elif 'stop_pin_generation' in msg:
elif 'start_pin_generation' in msg:
if pin_task is not None or pin_task.cancelled():
pin_task = asyncio.create_task(generate_pin_task(connection_id), name=f"pin_generate_{connection_id}")

print("room close")
await redis.close()
if psub:
await psub.close()
if pin_task:
if forward_task:

async def send_client(client_id, redis, msg):
print(f"publish to {client_id}: {msg}")
await redis.publish(f"bbb:client:{client_id}", json.dumps(msg))
if connection_id:
q = to_room[connection_id]
while not q.empty():
await q.get()
del to_room[connection_id]
del q

async def handle_ws() -> None:
pin_generation_stopped = False
redis = await aioredis.from_url(redis_url)
# pin provided by bbb plugin
pin = None
# verification pin after conn established
pairing_pin = random.randrange(1e3, 1e4 - 1)
my_client_id = await redis.client_id()
forward_task = None
data = await websocket.receive()
data = json.loads(data)
data = await websocket.receive_json()
#data = json.loads(data)
print(f"plugin received {data}")
pin = data['pin']
config = await redis.get(f'bbb:pins:{pin}')
client_id = await redis.get(f'bbb:clients:{pin}')
if config is None or client_id is None:
if not pin in pin_to_room:
await websocket.send(json.dumps({'status': 404, 'msg': 'PIN not found'}))
room_connection_id = pin_to_room[pin]
config = room_config.get(room_connection_id)
print(f"plugin found config {config}")
if config is None:
print("no config")
await websocket.send(json.dumps({'status': 500, 'msg': 'client config not found. This is a bug'}))
if not validate_client_config(config):
await websocket.send(json.dumps({'status': 500, 'msg': 'invalid client config'}))
client_id = int(client_id)
print("config is valid")
print("sende stop_pin_generation")
await send_client(client_id, redis, {'stop_pin_generation': True})
to_room_queue = to_room[room_connection_id]
await to_room_queue.put({'stop_pin_generation': True})
pin_generation_stopped = True
await websocket.send(json.dumps({'status': 200, 'msg': 'ok', 'config': json.loads(config)}))
await websocket.send_json({'status': 200, 'msg': 'ok', 'config': config})
# jetzt bekommen wir die URLs
print("Warte auf URLS")
data = await websocket.receive()
data = await websocket.receive_json()
print(f"empfangen: {data}")
data = json.loads(data)
if not 'urls' in data:
await websocket.send(json.dumps({'status': 500, 'msg': 'invalid format. Expecting urls'}))
await send_client(client_id, redis, {'start': True, 'urls': data['urls'], 'pairing_pin': pairing_pin, 'client_id': my_client_id})
to_plugin[pin] = asyncio.Queue()
await to_room_queue.put({'start': True, 'urls': data['urls'], 'pairing_pin': pairing_pin})
await websocket.send(json.dumps({'status': 200, 'msg': 'pairing', 'pairing_pin': pairing_pin}))
channel_name = f'bbb:client:{my_client_id}'
forward_task = asyncio.create_task(forward_from_pubsub(channel_name, websocket))
forward_task = asyncio.create_task(forward_from_queue_to_websocket(to_plugin[pin], websocket))
while True:
data = await websocket.receive()
data = json.loads(data)
await send_client(client_id, redis, data)
await to_room.put(data)

# wait for session terminate

Expand All @@ -160,42 +159,27 @@ async def handle_ws() -> None:
print("closing plugin")
if pin_generation_stopped:
await send_client(client_id, redis, {'start_pin_generation': True})
await to_room_queue.put({'start_pin_generation': True})
if forward_task:
await redis.close()
if pin in to_plugin:
while not to_plugin[pin].empty():
await to_plugin[pin].get()
del to_plugin[pin]
await websocket.close(1007)
print("closed plugin")

async def forward_from_websocket(channel, websocket):
redis = await aioredis.from_url(redis_url)
async def forward_from_queue_to_websocket(q, websocket):
while True:
data = await websocket.receive()
data = json.loads(data)
data = await q.get()
if 'type' in data and data['type'] == 'ping':
await redis.publish(channel, json.dumps(data))
await websocket.send_json(data)
except asyncio.CancelledError:

async def forward_from_pubsub(channel, websocket):
redis = await aioredis.from_url(redis_url)
psub = redis.pubsub()
async with psub as p:
await p.subscribe(channel)
while True:
async for msg in p.listen():
print(f"{channel} received: {msg}")
if msg['type'] != 'message':
if msg['data'] is not None:
await websocket.send(msg['data'].decode('utf-8'))
except asyncio.CancelledError:

def run() -> None:
4 changes: 2 additions & 2 deletions pairing-server/pytest.ini
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#testpaths =
# test
testpaths =
1 change: 1 addition & 0 deletions pairing-server/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
-r requirements.txt
2 changes: 1 addition & 1 deletion pairing-server/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
30 changes: 24 additions & 6 deletions pairing-server/test/
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import pytest
from async_timeout import timeout
from quart.testing.connections import WebsocketDisconnectError

from app import app
import json
Expand All @@ -16,19 +17,36 @@ async def run(*args, **kwargs):
async def test_room_get_pin():
test_client = app.test_client()
async with test_client.websocket('/ws_room') as test_websocket:
await test_websocket.send(json.dumps({'config': {'foo': 'bar'}}))
response = await test_websocket.receive()
client_room = app.test_client()
async with client_room.websocket('/ws_room') as ws_room:
await ws_room.send(json.dumps({'config': {'foo': 'bar'}}))
response = await ws_room.receive()
response = json.loads(response)
assert 'pin' in response
assert 'timeout' in response
assert type(response['pin']) == int
assert type(response['timeout']) == int
await test_websocket.close(1000)
#raise RuntimeError("foo")
await ws_room.close(1000)
await asyncio.sleep(0.01)

async def test_room_no_config():
client_room = app.test_client()
async with client_room.websocket('/ws_room') as ws_room:
await ws_room.send(json.dumps({}))
with pytest.raises(WebsocketDisconnectError):
response = await ws_room.receive()

async def test_room_no_json():
client_room = app.test_client()
async with client_room.websocket('/ws_room') as ws_room:
await ws_room.send(b'foobar')
with pytest.raises(WebsocketDisconnectError):
response = await ws_room.receive()

async def test_room_join_cycle():
Expand Down

0 comments on commit d0841b4

Please sign in to comment.