Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve message routing #304

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft

Improve message routing #304

wants to merge 1 commit into from

Conversation

empicano
Copy link
Owner

@empicano empicano commented Jun 2, 2024

This proposes a simpler way to filter messages and structure message handling.

Unsorted points that I considered:

  • This lets us split message handling into multiple files via multiple routers (without circular imports)
  • This lets us use the client inside a handler function, to e.g. publish a message back in a request/response fashion
  • We can dynamically subscribe and unsubscribe
  • The values of wildcards (+/#) of the topic filter are automatically available as *args in the handler function
  • We still only have a single message queue (easier for newcomers, concurrency could be implemented as shown below, optionally with priority queue)
  • We can still pass a non-default queue to the client to prioritize the handling of certain messages
  • We still have flexibility to not use the routers, but handle the messages directly. Routers are a natural development, once the application gets too complex we can iteratively add them
  • We are still flexible enough to process messages concurrently in an individual way (i.e. our message loop is still transparent)
  • It's a non-breaking change

The interface looks like this:

router = aiomqtt.Router()


@router.match("humidity/+/inside/#")
async def handle(message, client, *args):  # automatically extract wildcards into *args
    print(message.payload)


async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
    await client.subscribe("humidity/#")
    async for message in client.messages:
        await client.route(message)

Where we can process messages concurrently e.g. like this:

router = aiomqtt.Router()


@router.match("humidity/+/inside/#")
async def handle(message, client, *args):  # automatically extract wildcards into *args
    print(message.payload)


async def work(client):
    async for message in client._messages():
        await client.route(message)


async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
    await client.subscribe("humidity/#")
    async with asyncio.TaskGroup() as tg:
        tg.create_task(work(client))
        tg.create_task(work(client))

Glad to hear feedback on this 🙂

@empicano
Copy link
Owner Author

empicano commented Jul 30, 2024

Another possibility would be:

async def handle(message, client, *args):  # automatically extract wildcards into *args
    print(message.payload)


async with aiomqtt.Client(
    "test.mosquitto.org",
    handlers={"humidity/+/inside/#": handle}
) as client:
    await client.subscribe("humidity/#")
    async for message in client.messages:
        await client.route(message)
  • This could spare users the possible confusion of "Do I use a router or a client, what's the difference?"
  • What's nice about the other approach is that the filter (humidity/+/inside/#) is written directly next to the handler function, which makes it immediately clear which messages are handled in this function without jumping around in the code. On the other hand, with this option we have all the filters in one place. Probably just comes down to preference.
  • This avoids the problem of defining two routers with overlap in filters

I currently prefer this option.


Aside: We should be able to optimize message to handler matching with a trie.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant