Skip to content

Commit

Permalink
Add ESLint
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad committed Jun 25, 2023
1 parent eaf0186 commit a6a9053
Show file tree
Hide file tree
Showing 13 changed files with 1,927 additions and 331 deletions.
28 changes: 28 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"extends": [
"eslint:recommended"
],
"plugins": [
],
"parserOptions": {
"ecmaVersion": 2018,
"sourceType": "module"
},
"env": {
"browser": true,
"node": true,
"es6": true
},
"rules": {
"no-unused-vars": [
"warn",
{
"argsIgnorePattern": "^_",
"varsIgnorePattern": "^_",
"caughtErrorsIgnorePattern": "^_"
}
],
"no-cond-assign": "warn",
"no-constant-condition": "off"
}
}
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ jobs:

# Run the tests
- run: make test

# Run linting
- run: npm run lint
2 changes: 1 addition & 1 deletion callback_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ function connect(url, options, cb) {
if (err === null) cb(null, new CallbackModel(c));
else cb(err);
});
};
}

module.exports.connect = connect;
module.exports.credentials = require('./lib/credentials');
Expand Down
2 changes: 1 addition & 1 deletion channel_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ function connect(url, connOptions) {
.then(function(conn) {
return new ChannelModel(conn);
});
};
}

module.exports.connect = connect;
module.exports.credentials = require('./lib/credentials');
Expand Down
17 changes: 9 additions & 8 deletions lib/callback_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

'use strict';

var defs = require('./defs');
var EventEmitter = require('events').EventEmitter;
var BaseChannel = require('./channel').BaseChannel;
var acceptMessage = require('./channel').acceptMessage;
var Args = require('./api_args');
const util = require('util')
const defs = require('./defs');
const EventEmitter = require('events').EventEmitter;
const BaseChannel = require('./channel').BaseChannel;
const acceptMessage = require('./channel').acceptMessage;
const Args = require('./api_args');

class CallbackModel extends EventEmitter {
constructor (connection) {
Expand All @@ -26,7 +27,7 @@ class CallbackModel extends EventEmitter {

createChannel (cb) {
var ch = new Channel(this.connection);
ch.open(function (err, ok) {
ch.open(function (err, _ok) {
if (err === null)
cb && cb(null, ch);
else
Expand Down Expand Up @@ -132,7 +133,7 @@ class Channel extends BaseChannel {
this._rpc(defs.ExchangeDeclare,
Args.assertExchange(ex, type, options),
defs.ExchangeDeclareOk,
function (e, _) { cb(e, { exchange: ex }); });
function (e, _ok) { cb(e, { exchange: ex }); });
return this;
}

Expand Down Expand Up @@ -219,7 +220,7 @@ class Channel extends BaseChannel {
}
else {
cb(new Error("Unexpected response to BasicGet: " +
inspect(f)));
util.inspect(f)));
}
}
});
Expand Down
126 changes: 65 additions & 61 deletions lib/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class Channel extends EventEmitter {
// Internal, synchronously resolved RPC; the return value is resolved
// with the whole frame.
_rpc (method, fields, expect, cb) {
var self = this;
const self = this;

function reply (err, f) {
if (err === null) {
Expand All @@ -107,9 +107,9 @@ class Channel extends EventEmitter {
else {
// We have detected a problem, so it's up to us to close the
// channel
var expectedName = methodName(expect);
const expectedName = methodName(expect);

var e = new Error(fmt("Expected %s; got %s",
const e = new Error(fmt("Expected %s; got %s",
expectedName, inspect(f, false)));
self.closeWithError(f.id, fmt('Expected %s; got %s',
expectedName, methodName(f.id)),
Expand All @@ -129,12 +129,12 @@ class Channel extends EventEmitter {
// and the channel is closed by the server
else {
// otherwise, it's a close frame
var closeReason = (err.fields.classId << 16) + err.fields.methodId;
var e = (method === closeReason)
const closeReason = (err.fields.classId << 16) + err.fields.methodId;
const e = (method === closeReason)
? fmt("Operation failed: %s; %s",
methodName(method), closeMsg(err))
: fmt("Channel closed by server: %s", closeMsg(err));
var closeFrameError = new Error(e);
const closeFrameError = new Error(e);
closeFrameError.code = err.fields.replyCode;
closeFrameError.classId = err.fields.classId;
closeFrameError.methodId = err.fields.methodId;
Expand Down Expand Up @@ -290,63 +290,67 @@ class Channel extends EventEmitter {

switch (f.id) {

// Message frames
case undefined: // content frame!
case defs.BasicDeliver:
case defs.BasicReturn:
case defs.BasicProperties:
return this.acceptMessageFrame(f);

// confirmations, need to do confirm.select first
case defs.BasicAck:
return this.emit('ack', f.fields);
case defs.BasicNack:
return this.emit('nack', f.fields);
case defs.BasicCancel:
// The broker can send this if e.g., the queue is deleted.
return this.emit('cancel', f.fields);

case defs.ChannelClose:
// Any remote closure is an error to us. Reject the pending reply
// with the close frame, so it can see whether it was that
// operation that caused it to close.
if (this.reply) {
var reply = this.reply; this.reply = null;
reply(f);
// Message frames
case undefined: // content frame!
case defs.BasicDeliver:
case defs.BasicReturn:
case defs.BasicProperties:
return this.acceptMessageFrame(f);

// confirmations, need to do confirm.select first
case defs.BasicAck:
return this.emit('ack', f.fields);
case defs.BasicNack:
return this.emit('nack', f.fields);
case defs.BasicCancel:
// The broker can send this if e.g., the queue is deleted.
return this.emit('cancel', f.fields);

case defs.ChannelClose: {
// Any remote closure is an error to us. Reject the pending reply
// with the close frame, so it can see whether it was that
// operation that caused it to close.
if (this.reply) {
const reply = this.reply;
this.reply = null;
reply(f);
}
const emsg = "Channel closed by server: " + closeMsg(f);
this.sendImmediately(defs.ChannelCloseOk, {});

const error = new Error(emsg);
error.code = f.fields.replyCode;
error.classId = f.fields.classId;
error.methodId = f.fields.methodId;
this.emit('error', error);

const s = stackCapture(emsg);
this.toClosed(s);
return;
}
var emsg = "Channel closed by server: " + closeMsg(f);
this.sendImmediately(defs.ChannelCloseOk, {});

var error = new Error(emsg);
error.code = f.fields.replyCode;
error.classId = f.fields.classId;
error.methodId = f.fields.methodId;
this.emit('error', error);

var s = stackCapture(emsg);
this.toClosed(s);
return;

case defs.BasicFlow:
// RabbitMQ doesn't send this, it just blocks the TCP socket
return this.closeWithError(f.id, "Flow not implemented",
defs.constants.NOT_IMPLEMENTED,
new Error('Flow not implemented'));

default: // assume all other things are replies
// Resolving the reply may lead to another RPC; to make sure we
// don't hold that up, clear this.reply
var reply = this.reply; this.reply = null;
// however, maybe there's an RPC waiting to go? If so, that'll
// fill this.reply again, restoring the invariant. This does rely
// on any response being recv'ed after resolving the promise,
// below; hence, I use synchronous defer.
if (this.pending.length > 0) {
var send = this.pending.shift();
this.reply = send.reply;
this.sendImmediately(send.method, send.fields);

case defs.BasicFlow:
// RabbitMQ doesn't send this, it just blocks the TCP socket
return this.closeWithError(f.id, "Flow not implemented",
defs.constants.NOT_IMPLEMENTED,
new Error('Flow not implemented'));

default: { // assume all other things are replies
// Resolving the reply may lead to another RPC; to make sure we
// don't hold that up, clear this.reply
const reply = this.reply;
this.reply = null;
// however, maybe there's an RPC waiting to go? If so, that'll
// fill this.reply again, restoring the invariant. This does rely
// on any response being recv'ed after resolving the promise,
// below; hence, I use synchronous defer.
if (this.pending.length > 0) {
const send = this.pending.shift();
this.reply = send.reply;
this.sendImmediately(send.method, send.fields);
}
return reply(null, f);
}
return reply(null, f);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/channel_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class Channel extends BaseChannel {
this.unregisterConsumer(consumerTag);
return ok.fields;
});
return ok
}

get(queue, options) {
Expand Down
4 changes: 2 additions & 2 deletions lib/codec.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ function encodeFieldValue(buffer, value, offset) {
var start = offset;
var type = typeof value, val = value;
// A trapdoor for specifying a type, e.g., timestamp
if (value && type === 'object' && value.hasOwnProperty('!')) {
if (value && type === 'object' && Object.prototype.hasOwnProperty.call(value, '!')) {
val = value.value;
type = value['!'];
}
Expand Down Expand Up @@ -216,7 +216,7 @@ function encodeFieldValue(buffer, value, offset) {
break;
case 'decimal':
tag('D');
if (val.hasOwnProperty('places') && val.hasOwnProperty('digits')
if (Object.prototype.hasOwnProperty.call(val, 'places') && Object.prototype.hasOwnProperty.call(val, 'digits')
&& val.places >= 0 && val.places < 256) {
buffer[offset] = val.places; offset++;
buffer.writeUInt32BE(val.digits, offset); offset += 4;
Expand Down
4 changes: 2 additions & 2 deletions lib/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ function openFrames(vhost, query, credentials, extraClientProperties) {
else
vhost = QS.unescape(vhost);

var query = query || {};
query = query || {};

function intOrDefault(val, def) {
return (val === undefined) ? def : parseInt(val);
Expand Down Expand Up @@ -146,7 +146,7 @@ function connect(url, socketOptions, openCallback) {
if (keepAlive) sock.setKeepAlive(keepAlive, keepAliveDelay);

var c = new Connection(sock);
c.open(fields, function(err, ok) {
c.open(fields, function(err, _ok) {
// disable timeout once the connection is open, we don't want
// it fouling things
if (timeout) sock.setTimeout(0);
Expand Down
24 changes: 12 additions & 12 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -474,34 +474,34 @@ class Connection extends EventEmitter {
}

sendMethod (channel, Method, fields) {
var frame = encodeMethod(Method, channel, fields);
const frame = encodeMethod(Method, channel, fields);
this.sentSinceLastCheck = true;
var buffer = this.channels[channel].buffer;
const buffer = this.channels[channel].buffer;
return buffer.write(frame);
}

sendMessage (channel, Method, fields, Properties, props, content) {
if (!Buffer.isBuffer(content))
throw new TypeError('content is not a buffer');

var mframe = encodeMethod(Method, channel, fields);
var pframe = encodeProperties(Properties, channel,
const mframe = encodeMethod(Method, channel, fields);
const pframe = encodeProperties(Properties, channel,
content.length, props);
var buffer = this.channels[channel].buffer;
const buffer = this.channels[channel].buffer;
this.sentSinceLastCheck = true;

var methodHeaderLen = mframe.length + pframe.length;
var bodyLen = (content.length > 0) ?
const methodHeaderLen = mframe.length + pframe.length;
const bodyLen = (content.length > 0) ?
content.length + FRAME_OVERHEAD : 0;
var allLen = methodHeaderLen + bodyLen;
const allLen = methodHeaderLen + bodyLen;

if (allLen < SINGLE_CHUNK_THRESHOLD) {
// Use `allocUnsafe` to avoid excessive allocations and CPU usage
// from zeroing. The returned Buffer is not zeroed and so must be
// completely filled to be used safely.
// See https://github.com/amqp-node/amqplib/pull/695
var all = Buffer.allocUnsafe(allLen);
var offset = mframe.copy(all, 0);
const all = Buffer.allocUnsafe(allLen);
let offset = mframe.copy(all, 0);
offset += pframe.copy(all, offset);

if (bodyLen > 0)
Expand All @@ -514,8 +514,8 @@ class Connection extends EventEmitter {
// from zeroing. The returned Buffer is not zeroed and so must be
// completely filled to be used safely.
// See https://github.com/amqp-node/amqplib/pull/695
var both = Buffer.allocUnsafe(methodHeaderLen);
var offset = mframe.copy(both, 0);
const both = Buffer.allocUnsafe(methodHeaderLen);
const offset = mframe.copy(both, 0);
pframe.copy(both, offset);
buffer.write(both);
}
Expand Down
Loading

0 comments on commit a6a9053

Please sign in to comment.