Skip to content

Commit

Permalink
feat: expose AmqpConnectionManagerClass
Browse files Browse the repository at this point in the history
The naming is strange because I don't want to introduce any
breaking change.

Note that jest complains about leaking resources due to:
amqp-node/amqplib#584
  • Loading branch information
luddd3 committed Aug 30, 2021
1 parent 36f234b commit e86ed3c
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 16 deletions.
35 changes: 34 additions & 1 deletion src/AmqpConnectionManager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import amqp, { Connection } from 'amqplib';
import { EventEmitter } from 'events';
import { EventEmitter, once } from 'events';
import { TcpSocketConnectOpts } from 'net';
import pb from 'promise-breaker';
import { ConnectionOptions } from 'tls';
Expand Down Expand Up @@ -82,6 +82,8 @@ export interface IAmqpConnectionManager {
addListener(event: 'unblocked', listener: () => void): this;
addListener(event: 'disconnect', listener: (arg: { err: Error }) => void): this;

listeners(eventName: string | symbol): any;

on(event: string, listener: (...args: any[]) => void): this;
on(event: 'connect', listener: ConnectListener): this;
on(event: 'blocked', listener: (arg: { reason: string }) => void): this;
Expand All @@ -108,6 +110,8 @@ export interface IAmqpConnectionManager {

removeListener(event: string, listener: (...args: any[]) => void): this;

connect(options?: { timeout?: number }): Promise<void>;
reconnect(): void;
createChannel(options?: CreateChannelOpts): ChannelWrapper;
close(): Promise<void>;
isConnected(): boolean;
Expand Down Expand Up @@ -196,8 +200,37 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
this.setMaxListeners(0);

this._findServers = options.findServers || (() => Promise.resolve(urls));
}

async connect({ timeout }: { timeout?: number } = {}): Promise<void> {
this._connect();

let reject: (reason?: any) => void;
const onDisconnect = ({ err }: { err: any }) => {
// Ignore disconnects caused by dead servers etc., but throw on operational errors like bad credentials.
if (err.isOperational) {
reject(err);
}
};

try {
await Promise.race([
once(this, 'connect'),
new Promise((_resolve, innerReject) => {
reject = innerReject;
this.on('disconnect', onDisconnect);
}),
...(timeout
? [
wait(timeout).promise.then(() => {
throw new Error('amqp-connection-manager: connect timeout');
}),
]
: []),
]);
} finally {
this.removeListener('disconnect', onDisconnect);
}
}

// `options` here are any options that can be passed to ChannelWrapper.
Expand Down
8 changes: 7 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@ export function connect(
urls: ConnectionUrl | ConnectionUrl[] | undefined | null,
options?: AmqpConnectionManagerOptions
): IAmqpConnectionManager {
return new AmqpConnectionManager(urls, options);
const conn = new AmqpConnectionManager(urls, options);
conn.connect().catch(() => {
/* noop */
});
return conn;
}

export { AmqpConnectionManager as AmqpConnectionManagerClass };

const amqp = { connect };

export default amqp;
48 changes: 37 additions & 11 deletions test/AmqpConnectionManagerTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ describe('AmqpConnectionManager', function () {

it('should establish a connection to a broker', async () => {
amqp = new AmqpConnectionManager('amqp://localhost');
amqp.connect();
const [{ connection, url }] = await once(amqp, 'connect');
expect(url, 'url').to.equal('amqp://localhost');
expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
Expand All @@ -37,6 +38,7 @@ describe('AmqpConnectionManager', function () {
protocol: 'amqp',
hostname: 'localhost',
});
amqp.connect();
const [{ connection, url }] = await once(amqp, 'connect');
expect(url, 'url').to.eql({
protocol: 'amqp',
Expand All @@ -51,14 +53,15 @@ describe('AmqpConnectionManager', function () {

it('should establish a url object based connection to a broker', async () => {
amqp = new AmqpConnectionManager({ url: 'amqp://localhost' });

amqp.connect();
const [{ connection, url }] = await once(amqp, 'connect');
expect(url, 'url').to.equal('amqp://localhost');
expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
});

it('should close connection to a broker', async () => {
amqp = new AmqpConnectionManager('amqp://localhost');
amqp.connect();
const [{ connection, url }] = await once(amqp, 'connect');
expect(url, 'url').to.equal('amqp://localhost');
expect((connection as any).url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
Expand All @@ -77,6 +80,7 @@ describe('AmqpConnectionManager', function () {
let connected = false;

amqp = new AmqpConnectionManager('amqp://localhost');
amqp.connect();
// Connection should not yet be established
expect(amqp.connection, 'current connection').to.equal(undefined);
// Connection should be pending though
Expand Down Expand Up @@ -123,6 +127,7 @@ describe('AmqpConnectionManager', function () {
return Promise.resolve('amqp://localhost');
},
});
amqp.connect();
const [{ connection, url }] = await once(amqp, 'connect');
expect(url, 'url').to.equal('amqp://localhost');
expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
Expand All @@ -134,6 +139,7 @@ describe('AmqpConnectionManager', function () {
return Promise.resolve({ url: 'amqp://localhost' });
},
});
amqp.connect();
const [{ connection, url }] = await once(amqp, 'connect');
expect(url, 'url').to.equal('amqp://localhost');
expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5');
Expand All @@ -145,13 +151,29 @@ describe('AmqpConnectionManager', function () {
return Promise.resolve(null);
},
});
amqp.connect();
const [{ err }] = await once(amqp, 'disconnect');
expect(err.message).to.contain('No servers found');
return amqp?.close();
});

it('should timeout connect', async () => {
jest.spyOn(origAmpq, 'connect').mockImplementation((): any => {
return promiseTools.delay(200);
});
amqp = new AmqpConnectionManager('amqp://localhost');
let err;
try {
await amqp.connect({ timeout: 0.1 });
} catch (error) {
err = error;
}
expect(err.message).to.equal('amqp-connection-manager: connect timeout');
});

it('should work with a URL with a query', async () => {
amqp = new AmqpConnectionManager('amqp://localhost?frameMax=0x1000');
amqp.connect();
const [{ connection }] = await once(amqp, 'connect');
expect(connection.url, 'connection.url').to.equal(
'amqp://localhost?frameMax=0x1000&heartbeat=5'
Expand All @@ -171,6 +193,7 @@ describe('AmqpConnectionManager', function () {
amqp = new AmqpConnectionManager(['amqp://rabbit1', 'amqp://rabbit2'], {
heartbeatIntervalInSeconds: 0.01,
});
amqp.connect();

let disconnectEventsSeen = 0;
amqp.on('disconnect', function () {
Expand All @@ -196,10 +219,10 @@ describe('AmqpConnectionManager', function () {
let disconnectsSeen = 0;
amqp.on('disconnect', () => disconnectsSeen++);

await once(amqp, 'connect');
await amqp.connect();
amqplib.kill();

await once(amqp, 'connect');
await amqp.connect();
expect(disconnectsSeen).to.equal(1);
});

Expand All @@ -211,7 +234,7 @@ describe('AmqpConnectionManager', function () {
let disconnectsSeen = 0;
amqp.on('disconnect', () => disconnectsSeen++);

await once(amqp, 'connect');
await amqp.connect();

// Close the connection nicely
amqplib.simulateRemoteClose();
Expand All @@ -222,6 +245,7 @@ describe('AmqpConnectionManager', function () {

it('should know if it is connected or not', async () => {
amqp = new AmqpConnectionManager('amqp://localhost');
amqp.connect();

expect(amqp.isConnected()).to.be.false;

Expand All @@ -231,7 +255,7 @@ describe('AmqpConnectionManager', function () {

it('should be able to manually reconnect', async () => {
amqp = new AmqpConnectionManager('amqp://localhost');
await once(amqp, 'connect');
await amqp.connect();

amqp.reconnect();
await once(amqp, 'disconnect');
Expand All @@ -240,13 +264,14 @@ describe('AmqpConnectionManager', function () {

it('should throw on manual reconnect after close', async () => {
amqp = new AmqpConnectionManager('amqp://localhost');
await once(amqp, 'connect');
await amqp.close()
expect(amqp.reconnect).to.throw()
})
await amqp.connect();
await amqp.close();
expect(amqp.reconnect).to.throw();
});

it('should create and clean up channel wrappers', async function () {
amqp = new AmqpConnectionManager('amqp://localhost');
await amqp.connect();
const channel = amqp.createChannel({ name: 'test-chan' });

// Channel should register with connection manager
Expand All @@ -264,6 +289,7 @@ describe('AmqpConnectionManager', function () {

it('should clean up channels on close', async function () {
amqp = new AmqpConnectionManager('amqp://localhost');
await amqp.connect();
amqp.createChannel({ name: 'test-chan' });

// Channel should register with connection manager
Expand All @@ -286,7 +312,7 @@ describe('AmqpConnectionManager', function () {

let connectsSeen = 0;
amqp.on('connect', () => connectsSeen++);
await once(amqp, 'connect');
await amqp.connect();

// Close the manager
await amqp?.close();
Expand All @@ -308,7 +334,7 @@ describe('AmqpConnectionManager', function () {

amqp.on('unblocked', () => unblockSeen++);

await once(amqp, 'connect');
await amqp.connect();
// Close the connection nicely
amqplib.simulateRemoteBlock();
amqplib.simulateRemoteUnblock();
Expand Down
11 changes: 10 additions & 1 deletion test/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */

import { Connection, Message, Options, Replies } from 'amqplib';
import { EventEmitter } from 'events';
import { EventEmitter, once } from 'events';
import { IAmqpConnectionManager } from '../src/AmqpConnectionManager';
import ChannelWrapper, { CreateChannelOpts } from '../src/ChannelWrapper';

Expand Down Expand Up @@ -194,6 +194,15 @@ export class FakeAmqpConnectionManager extends EventEmitter implements IAmqpConn
return 0;
}

async connect(): Promise<void> {
await Promise.all([once(this, 'connect'), this.simulateConnect()]);
}

reconnect(): void {
this.simulateDisconnect();
this.simulateConnect();
}

isConnected() {
return this.connected;
}
Expand Down
6 changes: 5 additions & 1 deletion test/importTest.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import { expect } from 'chai';
import amqp from '../src';
import amqp, { AmqpConnectionManagerClass as AmqpConnectionManager } from '../src';

describe('import test', function () {
it('should let you import as default (#51)', function () {
expect(amqp).to.exist;
expect(amqp.connect).to.exist;
});

it('should let you import class', function () {
new AmqpConnectionManager('url');
});
});
15 changes: 14 additions & 1 deletion test/integrationTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import chai from 'chai';
import chaiJest from 'chai-jest';
import pEvent from 'p-event';
import { defer, timeout } from 'promise-tools';
import amqp from '../src';
import amqp, { AmqpConnectionManagerClass as AmqpConnectionManager } from '../src';
import { IAmqpConnectionManager } from '../src/AmqpConnectionManager';

chai.use(chaiJest);
Expand Down Expand Up @@ -69,6 +69,19 @@ describe('Integration tests', () => {
await timeout(pEvent(connection, 'connect'), 3000);
});

// This test might cause jest to complain about leaked resources due to the bug described and fixed by:
// https://github.com/squaremo/amqp.node/pull/584
it('should throw on awaited connect with wrong password', async () => {
connection = new AmqpConnectionManager('amqp://guest:wrong@localhost');
let err;
try {
await connection.connect();
} catch (error) {
err = error;
}
expect(err.message).to.contain('ACCESS-REFUSED');
});

it('send and receive messages', async () => {
const queueName = 'testQueue1';
const content = `hello world - ${Date.now()}`;
Expand Down

0 comments on commit e86ed3c

Please sign in to comment.