From a0fcb782c31648b1e3bbb4ed7f90e027408a7bed Mon Sep 17 00:00:00 2001 From: Owen Shoemaker Date: Tue, 14 May 2019 10:49:06 -0700 Subject: [PATCH] Added AWS IoT abstraction --- lib/ascoltatori.js | 4 +- lib/awsiot_ascoltatore.js | 189 ++++++++++++++++++++++++++++++++ package.json | 1 + test/awsiot_ascoltatore_spec.js | 114 +++++++++++++++++++ 4 files changed, 307 insertions(+), 1 deletion(-) create mode 100644 lib/awsiot_ascoltatore.js create mode 100644 test/awsiot_ascoltatore_spec.js diff --git a/lib/ascoltatori.js b/lib/ascoltatori.js index 0e4df04..616a2c5 100644 --- a/lib/ascoltatori.js +++ b/lib/ascoltatori.js @@ -16,6 +16,7 @@ module.exports.ZeromqAscoltatore = require("./zeromq_ascoltatore"); module.exports.AMQPAscoltatore = require("./amqp_ascoltatore"); module.exports.AMQPLibAscoltatore = require("./amqplib_ascoltatore"); module.exports.MQTTAscoltatore = require("./mqtt_ascoltatore"); +module.exports.AWSIoTAscoltatore = require("./awsiot_ascoltatore"); module.exports.PrefixAscoltatore = require("./prefix_acoltatore"); module.exports.MongoAscoltatore = require('./mongo_ascoltatore'); module.exports.DecoratorAscoltatore = require("./decorator_ascoltatore"); @@ -33,6 +34,7 @@ var classes = { "trie": module.exports.TrieAscoltatore, "eventemitter2": module.exports.EventEmitter2Ascoltatore, "mqtt": module.exports.MQTTAscoltatore, + "awsiot": module.exports.AWSIoTAscoltatore, "redis": module.exports.RedisAscoltatore, "zmq": module.exports.ZeromqAscoltatore, "mongo": module.exports.MongoAscoltatore, @@ -46,7 +48,7 @@ var classes = { * present. * The other options are passed through the constructor of the * Ascoltatore - * + * * Options: * - `type`, it can be "amqp", "trie", "eventemitter2", "redis", "zmq", or just a class * that will be instantiated (i.e. with `new`). diff --git a/lib/awsiot_ascoltatore.js b/lib/awsiot_ascoltatore.js new file mode 100644 index 0000000..43782b7 --- /dev/null +++ b/lib/awsiot_ascoltatore.js @@ -0,0 +1,189 @@ +"use strict"; + +var util = require("./util"); +var wrap = util.wrap; +var defer = util.defer; +var TrieAscoltatore = require("./trie_ascoltatore"); +var AbstractAscoltatore = require('./abstract_ascoltatore'); +var debug = require("debug")("ascoltatori:awsiot"); +var SubsCounter = require("./subs_counter"); +var steed = require("steed")(); + +/** + * AWSIoTAscoltatore is a class that inherits from AbstractAscoltatore. + * It is implemented through the `mqtt` package and it could be + * backed up by any MQTT broker out there. + * + * The options are: + * - `url`: the URL to connect to, as defined in https://www.npmjs.com/package/mqtt#connect + * - ... all the options defined in https://www.npmjs.com/package/mqtt#connect + * + * @api public + * @param {Object} opts The options object + */ +function AWSIoTAscoltatore(opts) { + AbstractAscoltatore.call(this, opts, { + separator: '/', + wildcardOne: '+', + wildcardSome: '#' + }); + + this._opts = opts || {}; + this._opts.keepalive = this._opts.keepalive || 3000; + this._opts.awsiot = this._opts.awsiot || require('aws-iot-device-sdk'); + + this._subs_counter = new SubsCounter(); + + this._ascoltatore = new TrieAscoltatore(opts); + this._startConn(); +} + +/** + * AWSIoTAscoltatore inherits from AbstractAscoltatore + * + * @api private + */ +AWSIoTAscoltatore.prototype = Object.create(AbstractAscoltatore.prototype); + +/** + * Starts a new connection to an MQTT server. + * Do nothing if it is already started. + * + * @api private + */ +AWSIoTAscoltatore.prototype._startConn = function() { + var that = this; + + if (this._client === undefined) { + debug("connecting.."); + this._client = this._opts.awsiot.device(that._opts); + + this._client.setMaxListeners(0); + this._client.on("connect", function() { + debug("connected"); + that.reconnectTopics(function(){ + that.emit("ready"); + }); + }); + + this._client.on("message", function(topic, payload, packet) { + debug("received new packet on topic " + topic); + // we need to skip out this callback, so we do not + // break the client when an exception occurs + defer(function() { + that._ascoltatore.publish(that._recvTopic(topic), payload, packet); + }); + }); + this._client.on('error', function(e) { + debug("error in the client"); + + delete that._client; + that.emit("error", e); + }); + } + return this._client; +}; + +AWSIoTAscoltatore.prototype.reconnectTopics = function reconnectTopics(cb) { + var that = this; + + var subscribedTopics = that._subs_counter.keys(); + + var opts = { + qos: 1 + }; + + steed.each(subscribedTopics, function(topic, callback) { + that._client.subscribe(that._subTopic(topic), opts, function() { + debug("re-registered subscriber for topic " + topic); + callback(); + }); + }, function(){ + cb(); + }); + +}; + +AWSIoTAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) { + this._raiseIfClosed(); + + if (!this._subs_counter.include(topic)) { + debug("registering new subscriber for topic " + topic); + + var opts = { + qos: 1 + }; + + this._client.subscribe(this._subTopic(topic), opts, function() { + debug("registered new subscriber for topic " + topic); + defer(done); + }); + + } else { + defer(done); + } + + this._subs_counter.add(topic); + this._ascoltatore.subscribe(topic, callback); +}; + +AWSIoTAscoltatore.prototype.publish = function publish(topic, message, options, done) { + this._raiseIfClosed(); + + this._client.publish(this._pubTopic(topic), message, { + qos: (options && (options.qos !== undefined)) ? options.qos : 1, + retain: (options && (options.retain !== undefined)) ? options.retain : false + }, function() { + debug("new message published to " + topic); + wrap(done)(); + }); +}; + +AWSIoTAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) { + this._raiseIfClosed(); + + var newDone = null; + + newDone = function() { + debug("deregistered subscriber for topic " + topic); + defer(done); + }; + + this._ascoltatore.unsubscribe(topic, callback); + this._subs_counter.remove(topic); + + if (this._subs_counter.include(topic)) { + newDone(); + return; + } + + debug("deregistering subscriber for topic " + topic); + this._client.unsubscribe(this._subTopic(topic), newDone); +}; + +AWSIoTAscoltatore.prototype.close = function close(done) { + var that = this; + debug("closing"); + if (!this._closed) { + this._subs_counter.clear(); + this._client.once("close", function() { + debug("closed"); + that._ascoltatore.close(); + delete that._client; + that.emit("closed"); + defer(done); + }); + this._client.end(); + } else { + wrap(done)(); + } +}; + +util.aliasAscoltatore(AWSIoTAscoltatore.prototype); + +/** + * Exports the AWSIoTAscoltatore + * + * @api public + */ +module.exports = AWSIoTAscoltatore; diff --git a/package.json b/package.json index 46a078a..fbc058d 100644 --- a/package.json +++ b/package.json @@ -59,6 +59,7 @@ "pre-commit": "^1.1.2" }, "dependencies": { + "aws-iot-device-sdk": "^2.2.1", "debug": "^2.2.0", "qlobber": "~0.7.0", "steed": "^1.1.3", diff --git a/test/awsiot_ascoltatore_spec.js b/test/awsiot_ascoltatore_spec.js new file mode 100644 index 0000000..8903dd7 --- /dev/null +++ b/test/awsiot_ascoltatore_spec.js @@ -0,0 +1,114 @@ +var steed = require('steed')(); + +describeAscoltatore("MQTT", function() { + + afterEach(function(done) { + this.instance.close(function() { + done(); + }); + delete this.instance; + }); + + it("should sync two instances", function(done) { + var other = new ascoltatori.AWSIoTAscoltatore(MQTTSettings()); + var that = this; + steed.series([ + + function(cb) { + other.on("ready", cb); + }, + + function(cb) { + that.instance.subscribe("hello", wrap(done), cb); + }, + + function(cb) { + other.publish("hello", null, cb); + }, + + function(cb) { + other.close(cb); + } + ]); + }); + + it("should publish with options", function(done) { + var that = this; + mqttServer.once('published', function(packet) { + expect(packet.qos).to.eql(0); + expect(packet.retain).to.eql(true); + done(); + }); + that.instance.publish("hello/123", "42", { qos: 0, retain: true }); + }); + +}); + +describe("MQTT Reconnect Test", function() { + it("should re-subscribe to topics", function(done) { + this.timeout(3000); // Set the test timeout to 3s + + var that = this; + var mosca = require("mosca"); + var msgReceived = false; + + var moscaOpts = { + port: 6884, + stats: false, + logger: { + level: "fatal" + } + }; + + var clientOpts = { + json: false, + mqtt: require("mqtt"), + host: "127.0.0.1", + port: 6884 + }; + + var mqttTestServer = new mosca.Server(moscaOpts); + var newClient = new ascoltatori.MQTTAscoltatore(clientOpts); + + steed.series([ + function(cb) { + newClient.once('ready',cb); + }, + + function(cb) { + // Subscribe to topic for test + newClient.subscribe('reconnect/test', function() { + newClient.emit('success'); + }, cb); + }, + + // Stop the MQTT server + function(cb) { + mqttTestServer.close(cb); + }, + + // Start the MQTT server + function(cb) { + mqttTestServer = new mosca.Server(moscaOpts, cb); + }, + + // Setup listener and send message + function(cb) { + newClient.once('success', function() { + msgReceived = true; + cb(); + }); + + newClient.once('ready', function(){ + newClient.publish('reconnect/test', 'blah'); + }); + }, + + ], function() { + if (msgReceived) { + done(); + } + }); + + }); +});