Skip to content

Commit

Permalink
Moved the content of index.js to lib/ascoltatori.js. Added docs to so…
Browse files Browse the repository at this point in the history
…me classes.

The classes doxxed are: AbstractAscoltatore, MemoryAscolatore,
ascoltatori and SubsCounter.

References moscajs#19.
  • Loading branch information
mcollina committed Feb 17, 2013
1 parent 94a6aae commit d954d55
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ docs-clean:
rm -rf docs

docs: docs-clean
./node_modules/.bin/dox-foundation --source lib --target docs
./node_modules/.bin/dox-foundation --source lib --target docs --title Ascoltatori

.PHONY: test
26 changes: 1 addition & 25 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,2 @@

var SubsCounter = require("./lib/subs_counter");
var util = require("./lib/util");
var MemoryAscoltatore = require('./lib/memory_ascoltatore');

module.exports.use = function use(ascoltatore) {
["publish", "subscribe", "unsubscribe", "close", "on",
"removeListener", "registerDomain"].forEach(function(f) {

module.exports[f] = ascoltatore[f].bind(ascoltatore);
});

util.aliasAscoltatore(this);

return this;
};

module.exports.use(new MemoryAscoltatore());

module.exports.MemoryAscoltatore = MemoryAscoltatore;
module.exports.RedisAscoltatore = require("./lib/redis_ascoltatore");
module.exports.ZeromqAscoltatore = require("./lib/zeromq_ascoltatore");
module.exports.AMQPAscoltatore = require("./lib/amqp_ascoltatore");
module.exports.MQTTAscoltatore= require("./lib/mqtt_ascoltatore");
module.exports.SubsCounter = SubsCounter;
module.exports.util = util;
module.exports = require("./lib/ascoltatori");
88 changes: 87 additions & 1 deletion lib/abstract_ascoltatore.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@

var EventEmitter = require('events').EventEmitter;

/**
* An `AbstractAscoltatore` is a class that inherits from `EventEmitter`.
* It is also the base class of `ascoltatori`. It is not meant to be used alone,
* but it defines the interface for every ascoltatore.
*
* Every ascoltatore emits the following events:
* - `ready`, when the ascolatore is ready for subscribing and/or
* publishing messages;
* - `closed`, when the ascoltatore has closed all the connections
* and therefore it cannot accept new messages.
*
* @api public
*/
function AbstractAscoltatore() {
EventEmitter.call(this);

Expand Down Expand Up @@ -30,9 +43,82 @@ AbstractAscoltatore.prototype = Object.create(EventEmitter.prototype);

AbstractAscoltatore.prototype._raiseIfClosed = function raiseIfClosed() {
if(this._closed)
throw "This ascoltatore " + this.prototype.name + " is closed";
throw new Error("This ascoltatore " + this.prototype.name + " is closed");
};

/**
* This method provides a way for users to subscribe for messages.
*
* The messages are published on topics, that is just a "path", e.g.
* `/this/is/a/topic`.
* The topic are organized in a hierarchy, and `subscribe` support the usage
* of wildcards, e.g. you can subscribe to `/this/*\/topic` and it will
* match all the topics
*
* Example:
* ascoltatore.subscribe("hello/*", function() {
* // this will print { '0': "hello/42", '1': "a message" }
* console.log(arguments);
* });
*
* @param {String} topic the topic to subscribe to
* @param {Function} callback the callback that will be called when a new message is published.
* @param {Function} done the callback that will be called when the subscribe is completed
* @api public
*/
AbstractAscoltatore.prototype.subscribe = function(topic, callback, done) {
throw new Error("Subclass to implement");
};

/**
* This method allow publishing of messages to topics.
*
* Example:
* ascoltatore.publish("hello/42", "a message", function() {
* console.log("message published");
* });
*
*
* @param {String} topic the topic to publish to
* @param {Object} payload the callback that will be called when a new message is published.
* @param {Function} done the callback that will be called after the message has been published.
* @api public
*/
AbstractAscoltatore.prototype.publish = function(topic, payload, done) {
throw new Error("Subclass to implement");
};

/**
* This method provides the inverse of subscribe.
*
* @param {String} topic the topic from which to unsubscribe
* @param {Function} callback the callback that will be unsubscribed
* @param {Function} done the callback that will be called when the unsubscribe is completed
* @api public
*/
AbstractAscoltatore.prototype.unsubscribe = function(topic, callback, done) {
throw new Error("Subclass to implement");
};

/**
* This method closes the Ascoltatore.
* After this method is called every call to subscribe or publish will raise
* an exception
*
* @param {Function} done the callback that will be called when Ascoltatore is closed
* @api public
*/
AbstractAscoltatore.prototype.close = function(done) {
throw new Error("Subclass to implement");
};

/**
* You can register a nodejs domain so that every callback is
* jailed and cannot crash the process.
*
* @param {Domain} domain the node.js error domain to use.
* @api public
*/
AbstractAscoltatore.prototype.registerDomain = function(domain) {
this._ascoltatore.registerDomain(domain);
};
Expand Down
50 changes: 50 additions & 0 deletions lib/ascoltatori.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@

var SubsCounter = require("./subs_counter");
var util = require("./util");

/**
* You can require any Ascolatore through this module.
*
* @api public
*/
module.exports.MemoryAscoltatore = require('./memory_ascoltatore');
module.exports.RedisAscoltatore = require("./redis_ascoltatore");
module.exports.ZeromqAscoltatore = require("./zeromq_ascoltatore");
module.exports.AMQPAscoltatore = require("./amqp_ascoltatore");
module.exports.MQTTAscoltatore= require("./mqtt_ascoltatore");

/**
* Use an Ascoltatore as a global pub/sub broker inside
* the current node process.
* Everyone requiring ascoltatori can use it.
*
* @param {AbstractAscoltatore} ascoltatore the Ascoltatore to use
* @return {Object} the `ascoltatori` module
* @api public
*/
module.exports.use = function use(ascoltatore) {
["publish", "subscribe", "unsubscribe", "close", "on",
"removeListener", "registerDomain"].forEach(function(f) {

module.exports[f] = ascoltatore[f].bind(ascoltatore);
});

util.aliasAscoltatore(this);

return this;
};

/**
* The default global Ascoltatore is a MemoryAscoltatore.
*
* @api public
*/
module.exports.use(new module.exports.MemoryAscoltatore());

/**
* These are just utilities
*
* @api private
*/
module.exports.SubsCounter = SubsCounter;
module.exports.util = util;
13 changes: 13 additions & 0 deletions lib/memory_ascoltatore.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ var util = require("./util");
var wrap = util.wrap;
var defer = util.defer;

/**
* A MemoryAscoltatore is a class that inherits from AbstractAscoltatore.
* It is backend by an EventEmitter and an object-map.
*
* @api public
*/
function MemoryAscoltatore() {
AbstractAscoltatore.call(this);

Expand All @@ -16,6 +22,13 @@ function MemoryAscoltatore() {
this._event.setMaxListeners(0);
}


/**
* See AbstractAscoltatore for the public API definitions.
*
* @api private
*/

MemoryAscoltatore.prototype = Object.create(AbstractAscoltatore.prototype);

MemoryAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) {
Expand Down
61 changes: 61 additions & 0 deletions lib/subs_counter.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@

/**
* This is a counter for the subscriptions.
* It is used by all the ascoltatori to keep track to
* for what topics there are subscribers.
*
* @api private
*/
function SubsCounter() {
this.clear();
}

/**
* Clear the SubsCounter
*
* @api private
*/
SubsCounter.prototype.clear = function() {
this._length = 0;
this._obj = {};
return this;
};

/**
* Add a new element to the SubsCounter.
* The SubsCounter keep track of the number of times
* this method was called for every passed `elem`.
*
* @param {String} elem The element to track
* @return {SubsCounter}
* @api private
*/
SubsCounter.prototype.add = function(elem) {
if(!this.include(elem)) {
this._length += 1;
Expand All @@ -20,6 +41,15 @@ SubsCounter.prototype.add = function(elem) {
return this;
};

/**
* Removes an element.
* The SubsCounter keep track of the total
* times an `elem` is added or removed.
*
* @api private
* @param {String} elem The element to track
* @return {SubsCounter}
*/
SubsCounter.prototype.remove = function(elem) {
if(!this.include(elem)) {
return this;
Expand All @@ -35,10 +65,29 @@ SubsCounter.prototype.remove = function(elem) {
return this;
};

/**
* The SubsCounter keep track of the total
* times an `elem` is added or removed, and it
* offers the `include` method to verify if it is
* greater than zero.
*
* @param {String} elem The element to track
* @return {boolean} true if the element has more than one subscription
* @api private
*/
SubsCounter.prototype.include = function(elem) {
return this._obj[elem] !== undefined;
};


/**
* List all the elements for which `include` returns true.
*
* @api private
* @param {Function} callback the function where the elements will be
* yield
* @return {SubsCounter}
*/
SubsCounter.prototype.forEach = function(callback) {
for(var key in this._obj) {
if(this._obj.hasOwnProperty(key)) {
Expand All @@ -49,12 +98,24 @@ SubsCounter.prototype.forEach = function(callback) {
return this;
};

/**
* List all the elements for which `include` returns true.
*
* @api private
* @return {Array} a list of elements
*/
SubsCounter.prototype.keys = function() {
var array = [];
this.forEach(function(e) { array.push(e) });
return array;
};

/**
* Returns the number of elements for which `include` returns true.
*
* @api private
* @return {Number}
*/
SubsCounter.prototype.__defineGetter__("length", function() {
return this._length;
});
Expand Down

0 comments on commit d954d55

Please sign in to comment.