Skip to content

Commit

Permalink
Added AWS IoT abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
owenshoemaker committed May 14, 2019
1 parent bf1dc13 commit a0fcb78
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 1 deletion.
4 changes: 3 additions & 1 deletion lib/ascoltatori.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module.exports.ZeromqAscoltatore = require("./zeromq_ascoltatore");
module.exports.AMQPAscoltatore = require("./amqp_ascoltatore");
module.exports.AMQPLibAscoltatore = require("./amqplib_ascoltatore");
module.exports.MQTTAscoltatore = require("./mqtt_ascoltatore");
module.exports.AWSIoTAscoltatore = require("./awsiot_ascoltatore");
module.exports.PrefixAscoltatore = require("./prefix_acoltatore");
module.exports.MongoAscoltatore = require('./mongo_ascoltatore');
module.exports.DecoratorAscoltatore = require("./decorator_ascoltatore");
Expand All @@ -33,6 +34,7 @@ var classes = {
"trie": module.exports.TrieAscoltatore,
"eventemitter2": module.exports.EventEmitter2Ascoltatore,
"mqtt": module.exports.MQTTAscoltatore,
"awsiot": module.exports.AWSIoTAscoltatore,
"redis": module.exports.RedisAscoltatore,
"zmq": module.exports.ZeromqAscoltatore,
"mongo": module.exports.MongoAscoltatore,
Expand All @@ -46,7 +48,7 @@ var classes = {
* present.
* The other options are passed through the constructor of the
* Ascoltatore
*
*
* Options:
* - `type`, it can be "amqp", "trie", "eventemitter2", "redis", "zmq", or just a class
* that will be instantiated (i.e. with `new`).
Expand Down
189 changes: 189 additions & 0 deletions lib/awsiot_ascoltatore.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"use strict";

var util = require("./util");
var wrap = util.wrap;
var defer = util.defer;
var TrieAscoltatore = require("./trie_ascoltatore");
var AbstractAscoltatore = require('./abstract_ascoltatore');
var debug = require("debug")("ascoltatori:awsiot");
var SubsCounter = require("./subs_counter");
var steed = require("steed")();

/**
* AWSIoTAscoltatore is a class that inherits from AbstractAscoltatore.
* It is implemented through the `mqtt` package and it could be
* backed up by any MQTT broker out there.
*
* The options are:
* - `url`: the URL to connect to, as defined in https://www.npmjs.com/package/mqtt#connect
* - ... all the options defined in https://www.npmjs.com/package/mqtt#connect
*
* @api public
* @param {Object} opts The options object
*/
function AWSIoTAscoltatore(opts) {
AbstractAscoltatore.call(this, opts, {
separator: '/',
wildcardOne: '+',
wildcardSome: '#'
});

this._opts = opts || {};
this._opts.keepalive = this._opts.keepalive || 3000;
this._opts.awsiot = this._opts.awsiot || require('aws-iot-device-sdk');

this._subs_counter = new SubsCounter();

this._ascoltatore = new TrieAscoltatore(opts);
this._startConn();
}

/**
* AWSIoTAscoltatore inherits from AbstractAscoltatore
*
* @api private
*/
AWSIoTAscoltatore.prototype = Object.create(AbstractAscoltatore.prototype);

/**
* Starts a new connection to an MQTT server.
* Do nothing if it is already started.
*
* @api private
*/
AWSIoTAscoltatore.prototype._startConn = function() {
var that = this;

if (this._client === undefined) {
debug("connecting..");
this._client = this._opts.awsiot.device(that._opts);

this._client.setMaxListeners(0);
this._client.on("connect", function() {
debug("connected");
that.reconnectTopics(function(){
that.emit("ready");
});
});

this._client.on("message", function(topic, payload, packet) {
debug("received new packet on topic " + topic);
// we need to skip out this callback, so we do not
// break the client when an exception occurs
defer(function() {
that._ascoltatore.publish(that._recvTopic(topic), payload, packet);
});
});
this._client.on('error', function(e) {
debug("error in the client");

delete that._client;
that.emit("error", e);
});
}
return this._client;
};

AWSIoTAscoltatore.prototype.reconnectTopics = function reconnectTopics(cb) {
var that = this;

var subscribedTopics = that._subs_counter.keys();

var opts = {
qos: 1
};

steed.each(subscribedTopics, function(topic, callback) {
that._client.subscribe(that._subTopic(topic), opts, function() {
debug("re-registered subscriber for topic " + topic);
callback();
});
}, function(){
cb();
});

};

AWSIoTAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) {
this._raiseIfClosed();

if (!this._subs_counter.include(topic)) {
debug("registering new subscriber for topic " + topic);

var opts = {
qos: 1
};

this._client.subscribe(this._subTopic(topic), opts, function() {
debug("registered new subscriber for topic " + topic);
defer(done);
});

} else {
defer(done);
}

this._subs_counter.add(topic);
this._ascoltatore.subscribe(topic, callback);
};

AWSIoTAscoltatore.prototype.publish = function publish(topic, message, options, done) {
this._raiseIfClosed();

this._client.publish(this._pubTopic(topic), message, {
qos: (options && (options.qos !== undefined)) ? options.qos : 1,
retain: (options && (options.retain !== undefined)) ? options.retain : false
}, function() {
debug("new message published to " + topic);
wrap(done)();
});
};

AWSIoTAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) {
this._raiseIfClosed();

var newDone = null;

newDone = function() {
debug("deregistered subscriber for topic " + topic);
defer(done);
};

this._ascoltatore.unsubscribe(topic, callback);
this._subs_counter.remove(topic);

if (this._subs_counter.include(topic)) {
newDone();
return;
}

debug("deregistering subscriber for topic " + topic);
this._client.unsubscribe(this._subTopic(topic), newDone);
};

AWSIoTAscoltatore.prototype.close = function close(done) {
var that = this;
debug("closing");
if (!this._closed) {
this._subs_counter.clear();
this._client.once("close", function() {
debug("closed");
that._ascoltatore.close();
delete that._client;
that.emit("closed");
defer(done);
});
this._client.end();
} else {
wrap(done)();
}
};

util.aliasAscoltatore(AWSIoTAscoltatore.prototype);

/**
* Exports the AWSIoTAscoltatore
*
* @api public
*/
module.exports = AWSIoTAscoltatore;
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"pre-commit": "^1.1.2"
},
"dependencies": {
"aws-iot-device-sdk": "^2.2.1",
"debug": "^2.2.0",
"qlobber": "~0.7.0",
"steed": "^1.1.3",
Expand Down
114 changes: 114 additions & 0 deletions test/awsiot_ascoltatore_spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
var steed = require('steed')();

describeAscoltatore("MQTT", function() {

afterEach(function(done) {
this.instance.close(function() {
done();
});
delete this.instance;
});

it("should sync two instances", function(done) {
var other = new ascoltatori.AWSIoTAscoltatore(MQTTSettings());
var that = this;
steed.series([

function(cb) {
other.on("ready", cb);
},

function(cb) {
that.instance.subscribe("hello", wrap(done), cb);
},

function(cb) {
other.publish("hello", null, cb);
},

function(cb) {
other.close(cb);
}
]);
});

it("should publish with options", function(done) {
var that = this;
mqttServer.once('published', function(packet) {
expect(packet.qos).to.eql(0);
expect(packet.retain).to.eql(true);
done();
});
that.instance.publish("hello/123", "42", { qos: 0, retain: true });
});

});

describe("MQTT Reconnect Test", function() {
it("should re-subscribe to topics", function(done) {
this.timeout(3000); // Set the test timeout to 3s

var that = this;
var mosca = require("mosca");
var msgReceived = false;

var moscaOpts = {
port: 6884,
stats: false,
logger: {
level: "fatal"
}
};

var clientOpts = {
json: false,
mqtt: require("mqtt"),
host: "127.0.0.1",
port: 6884
};

var mqttTestServer = new mosca.Server(moscaOpts);
var newClient = new ascoltatori.MQTTAscoltatore(clientOpts);

steed.series([
function(cb) {
newClient.once('ready',cb);
},

function(cb) {
// Subscribe to topic for test
newClient.subscribe('reconnect/test', function() {
newClient.emit('success');
}, cb);
},

// Stop the MQTT server
function(cb) {
mqttTestServer.close(cb);
},

// Start the MQTT server
function(cb) {
mqttTestServer = new mosca.Server(moscaOpts, cb);
},

// Setup listener and send message
function(cb) {
newClient.once('success', function() {
msgReceived = true;
cb();
});

newClient.once('ready', function(){
newClient.publish('reconnect/test', 'blah');
});
},

], function() {
if (msgReceived) {
done();
}
});

});
});

0 comments on commit a0fcb78

Please sign in to comment.