Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Shared subscriptions #449

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const defaultOptions = {
authorizeForward: defaultAuthorizeForward,
published: defaultPublished,
trustProxy: false,
sharedTopics: false,
trustedProxies: [],
queueLimit: 42,
maxClientsIdLength: 23
Expand Down Expand Up @@ -75,6 +76,8 @@ function Aedes (opts) {
this.trustProxy = opts.trustProxy
this.trustedProxies = opts.trustedProxies

this.sharedTopics = opts.sharedTopics

this.clients = {}
this.brokers = {}

Expand Down
21 changes: 21 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,27 @@ function Client (broker, conn, req) {
}
}

this.deliverShared = function (_packet, cb) {
const sharedSub = that.subscriptions[_packet.topic]
// this function will return the next client that should send the update based on round robin algorithm
// get first subscription $share/<group>/<topic> with lower lastUpdate
that.broker.persistence.nextSharedSubscription(_packet.topic, sharedSub.group, function (err, subscription) {
if (err) {
cb(new Error('Error while fetching shared subscription: ' + err.message))
} else {
if (subscription.clientId === that.id) { // this client should send an update
that.deliverQoS(_packet, function (err) {
if (err) {
cb(new Error('Error while updating shared subscription: ' + err.message))
} else {
that.broker.persistence.updateSharedSubscription(that, _packet.topic, sharedSub.group, cb)
}
})
}
}
})
}

this._keepaliveTimer = null
this._keepaliveInterval = -1

Expand Down
49 changes: 38 additions & 11 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const fastfall = require('fastfall')
const Packet = require('aedes-packet')
const through = require('through2')
const { validateTopic } = require('../utils')
const { validateTopic, sharedTopic } = require('../utils')
const write = require('../write')

const subscribeTopicActions = fastfall([
Expand All @@ -27,6 +27,12 @@ function Subscription (qos, func) {
this.func = func
}

function SharedSubscription (qos, func, group) {
this.qos = qos
this.func = func
this.group = group
}

function SubscribeState (client, packet, restore, finish, granted) {
this.client = client
this.packet = packet
Expand Down Expand Up @@ -115,24 +121,45 @@ function addSubs (sub, done) {

const client = this.client
const broker = client.broker
const topic = sub.topic
var topic = sub.topic
const qos = sub.qos
var func = qos > 0 ? client.deliverQoS : client.deliver0
var group = null

// [MQTT-4.7.2-1]
if (isStartsWithWildcard(topic)) {
func = blockDollarSignTopics(func)
}

if (!client.subscriptions[topic]) {
client.subscriptions[topic] = new Subscription(qos, func)
broker.subscribe(topic, func, done)
} else if (client.subscriptions[topic].qos !== qos) {
broker.unsubscribe(topic, client.subscriptions[topic].func)
client.subscriptions[topic] = new Subscription(qos, func)
broker.subscribe(topic, func, done)
} else {
done()
if (broker.sharedTopics) {
const parsedTopic = sharedTopic(topic)
if (parsedTopic) {
topic = parsedTopic.topic
func = client.deliverShared
group = parsedTopic.group
broker.persistence.addSharedSubscription(client, sub, function addSub (err) {
if (err) {
done(err)
} else {
finish()
}
})
} else {
finish()
}
} else finish()

function finish () {
if (!client.subscriptions[topic]) {
client.subscriptions[topic] = group ? new SharedSubscription(qos, func, group) : new Subscription(qos, func)
broker.subscribe(topic, func, done)
} else if (client.subscriptions[topic].qos !== qos) {
broker.unsubscribe(topic, client.subscriptions[topic].func)
client.subscriptions[topic] = group ? new SharedSubscription(qos, func, group) : new Subscription(qos, func)
broker.subscribe(topic, func, done)
} else {
done()
}
}
}

Expand Down
43 changes: 31 additions & 12 deletions lib/handlers/unsubscribe.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const write = require('../write')
const { validateTopic } = require('../utils')
const { validateTopic, sharedTopic } = require('../utils')

function UnsubscribeState (client, packet, finish) {
this.client = client
Expand Down Expand Up @@ -46,17 +46,36 @@ function actualUnsubscribe (client, packet, done) {
function doUnsubscribe (sub, done) {
const client = this.client
const broker = client.broker
const s = client.subscriptions[sub]

if (s) {
var func = s.func
delete client.subscriptions[sub]
broker.unsubscribe(
sub,
func,
done)
} else {
done()

if (broker.sharedTopics) {
const parsedTopic = sharedTopic(sub)
if (parsedTopic) {
sub = parsedTopic.topic
broker.persistence.removeSharedSubscription(client, sub, function removeSub (err) {
if (err) {
done(err)
} else {
finish()
}
})
} else {
finish()
}
} else finish()

function finish () {
const s = client.subscriptions[sub]

if (s) {
var func = s.func
delete client.subscriptions[sub]
broker.unsubscribe(
sub,
func,
done)
} else {
done()
}
}
}

Expand Down
14 changes: 13 additions & 1 deletion lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ function validateTopic (topic, message) {
}
}

function sharedTopic (topic) {
if (!topic || !topic.startsWith('$share/')) return null

var group = topic.substring(7, topic.indexOf('/', 7))

return {
group: group,
topic: topic.substring(8 + group.length)
}
}

module.exports = {
validateTopic
validateTopic,
sharedTopic
}