Skip to content

Commit

Permalink
Implement stream management requesting ACKs
Browse files Browse the repository at this point in the history
Queue outgoing stanzas and periodically request ACK.  Remove from the
queue anything ack'd and notify of the ack so apps can know the stanza
has for sure sent.

On resume, anything not ack'd is re-sent.  On reconnect, anything not
ack'd notify of the failure to send this stanza so apps can know the
stanza failed.

Even when there is no traffic, send an <r/> at least every 5 minutes to check the
connection.  If there is no inbound traffic (such as an <a/>) within
timeout (default 60 seconds) then consider the connection disconnected.
  • Loading branch information
singpolyma committed Nov 14, 2023
1 parent 8ad208e commit e2f73fd
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 4 deletions.
77 changes: 74 additions & 3 deletions packages/stream-management/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"use strict";

const xml = require("@xmpp/xml");
const time = require("@xmpp/time");

// https://xmpp.org/extensions/xep-0198.html

Expand Down Expand Up @@ -46,24 +47,34 @@ module.exports = function streamManagement({
middleware,
}) {
let address = null;
let timeoutTimeout = null;

const sm = {
allowResume: true,
preferredMaximum: null,
enabled: false,
id: "",
outbound_q: [],
outbound: 0,
inbound: 0,
max: null,
timeout: 60000,
};

entity.on("online", (jid) => {
address = jid;
if (sm.outbound_q.length > 0) {
throw "Stream Management assertion failure, queue should be empty during online";
}
sm.outbound = 0;
sm.inbound = 0;
});

entity.on("offline", () => {
let stanza;
while ((stanza = sm.outbound_q.shift())) {
entity.emit("stream-management/fail", stanza);
}
sm.outbound = 0;
sm.inbound = 0;
sm.enabled = false;
Expand All @@ -72,36 +83,93 @@ module.exports = function streamManagement({

middleware.use((context, next) => {
const { stanza } = context;
if (timeoutTimeout) clearTimeout(timeoutTimeout);
if (["presence", "message", "iq"].includes(stanza.name)) {
sm.inbound += 1;
} else if (stanza.is("r", NS)) {
// > When an <r/> element ("request") is received, the recipient MUST acknowledge it by sending an <a/> element to the sender containing a value of 'h' that is equal to the number of stanzas handled by the recipient of the <r/> element.
entity.send(xml("a", { xmlns: NS, h: sm.inbound })).catch(() => {});
} else if (stanza.is("a", NS)) {
// > When a party receives an <a/> element, it SHOULD keep a record of the 'h' value returned as the sequence number of the last handled outbound stanza for the current stream (and discard the previous value).
sm.outbound = stanza.attrs.h;
const oldOutbound = sm.outbound;
for (let i = 0; i < stanza.attrs.h - oldOutbound; i++) {
let stanza = sm.outbound_q.shift();
sm.outbound++;
entity.emit("stream-management/ack", stanza);
}
}

return next();
});

let requestAckTimeout = null;
function requestAck() {
if (timeoutTimeout) clearTimeout(timeoutTimeout);
if (sm.timeout) {
timeoutTimeout = setTimeout(() => entity.disconnect(), sm.timeout);
}
entity.send(xml("r", { xmlns: NS })).catch(() => {});
// Periodically send r to check the connection
// If a stanza goes out it will cancel this and set a sooner timer
requestAckTimeout = setTimeout(requestAck, 300000);
}

middleware.filter((context, next) => {
const { stanza } = context;
if (sm.enabled && ["presence", "message", "iq"].includes(stanza.name)) {
let qStanza = stanza;
if (
qStanza.name === "message" &&
!qStanza.getChild("delay", "urn:xmpp:delay")
) {
qStanza = xml.clone(stanza);
qStanza.c("delay", {
xmlns: "urn:xmpp:delay",
from: entity.jid.toString(),
stamp: time.datetime(),
});
}
sm.outbound_q.push(qStanza);
// Debounce requests so we send only one after a big run of stanza together
if (requestAckTimeout) clearTimeout(requestAckTimeout);
requestAckTimeout = setTimeout(requestAck, 100);
}
return next();
});

// https://xmpp.org/extensions/xep-0198.html#enable
// For client-to-server connections, the client MUST NOT attempt to enable stream management until after it has completed Resource Binding unless it is resuming a previous session

streamFeatures.use("sm", NS, async (context, next) => {
// Resuming
if (sm.id) {
try {
await resume(entity, sm.inbound, sm.id);
let resumed = await resume(entity, sm.inbound, sm.id);
sm.enabled = true;
entity.jid = address;
if (address) entity.jid = address;
entity.status = "online";
const oldOutbound = sm.outbound;
for (let i = 0; i < resumed.attrs.h - oldOutbound; i++) {
let stanza = sm.outbound_q.shift();
sm.outbound++;
entity.emit("stream-management/ack", stanza);
}
let q = sm.outbound_q;
sm.outbound_q = [];
for (const item of q) {
entity.send(item); // This will trigger the middleware and re-add to the queue
}
entity.emit("stream-management/resumed");
return true;
// If resumption fails, continue with session establishment
// eslint-disable-next-line no-unused-vars
} catch {
sm.id = "";
sm.enabled = false;
let stanza;
while ((stanza = sm.outbound_q.shift())) {
entity.emit("stream-management/fail", stanza);
}
sm.outbound = 0;
}
}
Expand All @@ -114,6 +182,9 @@ module.exports = function streamManagement({
const promiseEnable = enable(entity, sm.allowResume, sm.preferredMaximum);

// > The counter for an entity's own sent stanzas is set to zero and started after sending either <enable/> or <enabled/>.
if (sm.outbound_q.length > 0) {
throw "Stream Management assertion failure, queue should be empty after enable";
}
sm.outbound = 0;

try {
Expand Down
3 changes: 2 additions & 1 deletion packages/stream-management/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"management"
],
"dependencies": {
"@xmpp/xml": "^0.13.0"
"@xmpp/xml": "^0.13.0",
"@xmpp/time": "^0.13.0"
},
"engines": {
"node": ">= 14"
Expand Down
2 changes: 2 additions & 0 deletions packages/xml/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const Element = require("ltx/lib/Element");
const createElement = require("ltx/lib/createElement");
const clone = require("ltx/lib/clone");
const Parser = require("./lib/Parser");
const {
escapeXML,
Expand All @@ -19,6 +20,7 @@ module.exports = xml;

Object.assign(module.exports, {
Element,
clone,
createElement,
Parser,
escapeXML,
Expand Down

0 comments on commit e2f73fd

Please sign in to comment.