Skip to content

Commit

Permalink
fix: don't send retained messages when granted QoS is 128 and set gra…
Browse files Browse the repository at this point in the history
…nted QoS in `subscribe` event subs (#720)
  • Loading branch information
robertsLando authored Feb 15, 2022
1 parent 1743806 commit 51a13ea
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 21 deletions.
50 changes: 30 additions & 20 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ function _dedupe (subs) {
function handleSubscribe (client, packet, restore, done) {
packet.subscriptions = packet.subscriptions.length === 1 ? packet.subscriptions : _dedupe(packet.subscriptions)
client.broker._parallel(
new SubscribeState(client, packet, restore, done),
doSubscribe,
packet.subscriptions,
restore ? done : completeSubscribe)
new SubscribeState(client, packet, restore, done), // what will be this in the functions
doSubscribe, // function to call
packet.subscriptions, // first argument of the function
restore ? done : completeSubscribe // the function to be called when the parallel ends
)
}

function doSubscribe (sub, done) {
Expand Down Expand Up @@ -194,13 +195,20 @@ function completeSubscribe (err) {
}

const broker = client.broker

// subscriptions array to return as result in 'subscribe' event and $SYS
const subs = packet.subscriptions

// topics we need to retrieve retain values from
const topics = []

for (let i = 0; i < subs.length; i++) {
topics.push(subs[i].topic)
subs.qos = this.subState[i].granted
// skip topics that are not allowed
if (this.subState[i].granted !== 128) {
topics.push(subs[i].topic)
}
// set granted qos to subscriptions
subs[i].qos = this.subState[i].granted
}

this.subState = []
Expand All @@ -218,20 +226,22 @@ function completeSubscribe (err) {
// Restored sessions should not contain any retained message.
// Retained message should be only fetched from SUBSCRIBE.

const persistence = broker.persistence
const stream = persistence.createRetainedStreamCombi(topics)
stream.pipe(through(function sendRetained (packet, enc, cb) {
packet = new Packet({
cmd: packet.cmd,
qos: packet.qos,
topic: packet.topic,
payload: packet.payload,
retain: true
}, broker)
// this should not be deduped
packet.brokerId = null
client.deliverQoS(packet, cb)
}))
if (topics.length > 0) {
const persistence = broker.persistence
const stream = persistence.createRetainedStreamCombi(topics)
stream.pipe(through(function sendRetained (packet, enc, cb) {
packet = new Packet({
cmd: packet.cmd,
qos: packet.qos,
topic: packet.topic,
payload: packet.payload,
retain: true
}, broker)
// this should not be deduped
packet.brokerId = null
client.deliverQoS(packet, cb)
}))
}
}

function noop () { }
Expand Down
14 changes: 13 additions & 1 deletion test/auth.js
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ test('negate subscription', function (t) {
})

test('negate multiple subscriptions', function (t) {
t.plan(5)
t.plan(6)

const s = connect(setup())
t.teardown(s.broker.close.bind(s.broker))
Expand All @@ -771,6 +771,18 @@ test('negate multiple subscriptions', function (t) {
cb(null, null)
}

const expectedSubs = [{
topic: 'hello',
qos: 128
}, {
topic: 'world',
qos: 128
}]

s.broker.once('subscribe', function (subs, client) {
t.same(subs, expectedSubs)
})

s.inStream.write({
cmd: 'subscribe',
messageId: 24,
Expand Down
37 changes: 37 additions & 0 deletions test/retain.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,43 @@ test('reconnected subscriber will not receive retained messages when QoS 0 and c
})
})

test('subscriber will not receive retained messages when QoS is 128', function (t) {
t.plan(3)

const broker = aedes()
t.teardown(broker.close.bind(broker))

const pubPacket = {
cmd: 'publish',
topic: 'hello',
payload: 'world',
qos: 1,
retain: true,
messageId: 42
}

broker.authorizeSubscribe = function (client, sub, callback) {
if (sub.topic === pubPacket.topic) {
callback(null, null)
} else {
callback(null, sub)
}
}

const publisher = connect(setup(broker), { clean: true })

publisher.inStream.write(pubPacket)

publisher.outStream.on('data', function (packet) {
const subscriber = connect(setup(broker), { clean: true })
subscribe(t, subscriber, pubPacket.topic, 128, function () {
subscriber.outStream.on('data', function (packet) {
t.fail('should not received retain message')
})
})
})
})

// [MQTT-3.3.1-6]
test('new QoS 0 subscribers receive QoS 0 retained messages when clean', function (t) {
t.plan(9)
Expand Down

0 comments on commit 51a13ea

Please sign in to comment.