diff --git a/src/AmqpConnectionManager.ts b/src/AmqpConnectionManager.ts index 8eeaf01..11fb7d2 100644 --- a/src/AmqpConnectionManager.ts +++ b/src/AmqpConnectionManager.ts @@ -82,6 +82,8 @@ export interface IAmqpConnectionManager { addListener(event: 'unblocked', listener: () => void): this; addListener(event: 'disconnect', listener: (arg: { err: Error }) => void): this; + listeners(eventName: string | symbol): any; + on(event: string, listener: (...args: any[]) => void): this; on(event: 'connect', listener: ConnectListener): this; on(event: 'blocked', listener: (arg: { reason: string }) => void): this; @@ -108,6 +110,8 @@ export interface IAmqpConnectionManager { removeListener(event: string, listener: (...args: any[]) => void): this; + connect(): Promise; + reconnect(): void; createChannel(options?: CreateChannelOpts): ChannelWrapper; close(): Promise; isConnected(): boolean; @@ -196,8 +200,22 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp this.setMaxListeners(0); this._findServers = options.findServers || (() => Promise.resolve(urls)); + } + connect(): Promise { this._connect(); + return new Promise((resolve, reject) => { + const onConnect = () => { + this.removeListener('disconnect', onDisconnect); + resolve(); + }; + const onDisconnect = ({ err }: { err: Error }) => { + this.removeListener('connect', onConnect); + reject(err); + }; + this.once('connect', onConnect); + this.once('disconnect', onDisconnect); + }); } // `options` here are any options that can be passed to ChannelWrapper. diff --git a/src/index.ts b/src/index.ts index 7e32324..01e7224 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,9 +15,15 @@ export function connect( urls: ConnectionUrl | ConnectionUrl[] | undefined | null, options?: AmqpConnectionManagerOptions ): IAmqpConnectionManager { - return new AmqpConnectionManager(urls, options); + const conn = new AmqpConnectionManager(urls, options); + conn.connect().catch(() => { + /* noop */ + }); + return conn; } +export { AmqpConnectionManager as AmqpConnectionManagerClass }; + const amqp = { connect }; export default amqp; diff --git a/test/AmqpConnectionManagerTest.ts b/test/AmqpConnectionManagerTest.ts index 560a60c..8f5314b 100644 --- a/test/AmqpConnectionManagerTest.ts +++ b/test/AmqpConnectionManagerTest.ts @@ -27,6 +27,7 @@ describe('AmqpConnectionManager', function () { it('should establish a connection to a broker', async () => { amqp = new AmqpConnectionManager('amqp://localhost'); + amqp.connect(); const [{ connection, url }] = await once(amqp, 'connect'); expect(url, 'url').to.equal('amqp://localhost'); expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5'); @@ -37,6 +38,7 @@ describe('AmqpConnectionManager', function () { protocol: 'amqp', hostname: 'localhost', }); + amqp.connect(); const [{ connection, url }] = await once(amqp, 'connect'); expect(url, 'url').to.eql({ protocol: 'amqp', @@ -51,7 +53,7 @@ describe('AmqpConnectionManager', function () { it('should establish a url object based connection to a broker', async () => { amqp = new AmqpConnectionManager({ url: 'amqp://localhost' }); - + amqp.connect(); const [{ connection, url }] = await once(amqp, 'connect'); expect(url, 'url').to.equal('amqp://localhost'); expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5'); @@ -59,6 +61,7 @@ describe('AmqpConnectionManager', function () { it('should close connection to a broker', async () => { amqp = new AmqpConnectionManager('amqp://localhost'); + amqp.connect(); const [{ connection, url }] = await once(amqp, 'connect'); expect(url, 'url').to.equal('amqp://localhost'); expect((connection as any).url, 'connection.url').to.equal('amqp://localhost?heartbeat=5'); @@ -77,6 +80,7 @@ describe('AmqpConnectionManager', function () { let connected = false; amqp = new AmqpConnectionManager('amqp://localhost'); + amqp.connect(); // Connection should not yet be established expect(amqp.connection, 'current connection').to.equal(undefined); // Connection should be pending though @@ -123,6 +127,7 @@ describe('AmqpConnectionManager', function () { return Promise.resolve('amqp://localhost'); }, }); + amqp.connect(); const [{ connection, url }] = await once(amqp, 'connect'); expect(url, 'url').to.equal('amqp://localhost'); expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5'); @@ -134,6 +139,7 @@ describe('AmqpConnectionManager', function () { return Promise.resolve({ url: 'amqp://localhost' }); }, }); + amqp.connect(); const [{ connection, url }] = await once(amqp, 'connect'); expect(url, 'url').to.equal('amqp://localhost'); expect(connection.url, 'connection.url').to.equal('amqp://localhost?heartbeat=5'); @@ -145,6 +151,7 @@ describe('AmqpConnectionManager', function () { return Promise.resolve(null); }, }); + amqp.connect(); const [{ err }] = await once(amqp, 'disconnect'); expect(err.message).to.contain('No servers found'); return amqp?.close(); @@ -152,6 +159,7 @@ describe('AmqpConnectionManager', function () { it('should work with a URL with a query', async () => { amqp = new AmqpConnectionManager('amqp://localhost?frameMax=0x1000'); + amqp.connect(); const [{ connection }] = await once(amqp, 'connect'); expect(connection.url, 'connection.url').to.equal( 'amqp://localhost?frameMax=0x1000&heartbeat=5' @@ -171,6 +179,7 @@ describe('AmqpConnectionManager', function () { amqp = new AmqpConnectionManager(['amqp://rabbit1', 'amqp://rabbit2'], { heartbeatIntervalInSeconds: 0.01, }); + amqp.connect(); let disconnectEventsSeen = 0; amqp.on('disconnect', function () { @@ -196,10 +205,10 @@ describe('AmqpConnectionManager', function () { let disconnectsSeen = 0; amqp.on('disconnect', () => disconnectsSeen++); - await once(amqp, 'connect'); + await amqp.connect(); amqplib.kill(); - await once(amqp, 'connect'); + await amqp.connect(); expect(disconnectsSeen).to.equal(1); }); @@ -211,7 +220,7 @@ describe('AmqpConnectionManager', function () { let disconnectsSeen = 0; amqp.on('disconnect', () => disconnectsSeen++); - await once(amqp, 'connect'); + await amqp.connect(); // Close the connection nicely amqplib.simulateRemoteClose(); @@ -222,6 +231,7 @@ describe('AmqpConnectionManager', function () { it('should know if it is connected or not', async () => { amqp = new AmqpConnectionManager('amqp://localhost'); + amqp.connect(); expect(amqp.isConnected()).to.be.false; @@ -231,7 +241,7 @@ describe('AmqpConnectionManager', function () { it('should be able to manually reconnect', async () => { amqp = new AmqpConnectionManager('amqp://localhost'); - await once(amqp, 'connect'); + await amqp.connect(); amqp.reconnect(); await once(amqp, 'disconnect'); @@ -240,13 +250,14 @@ describe('AmqpConnectionManager', function () { it('should throw on manual reconnect after close', async () => { amqp = new AmqpConnectionManager('amqp://localhost'); - await once(amqp, 'connect'); - await amqp.close() - expect(amqp.reconnect).to.throw() - }) + await amqp.connect(); + await amqp.close(); + expect(amqp.reconnect).to.throw(); + }); it('should create and clean up channel wrappers', async function () { amqp = new AmqpConnectionManager('amqp://localhost'); + await amqp.connect(); const channel = amqp.createChannel({ name: 'test-chan' }); // Channel should register with connection manager @@ -264,6 +275,7 @@ describe('AmqpConnectionManager', function () { it('should clean up channels on close', async function () { amqp = new AmqpConnectionManager('amqp://localhost'); + await amqp.connect(); amqp.createChannel({ name: 'test-chan' }); // Channel should register with connection manager @@ -286,7 +298,7 @@ describe('AmqpConnectionManager', function () { let connectsSeen = 0; amqp.on('connect', () => connectsSeen++); - await once(amqp, 'connect'); + await amqp.connect(); // Close the manager await amqp?.close(); @@ -308,7 +320,7 @@ describe('AmqpConnectionManager', function () { amqp.on('unblocked', () => unblockSeen++); - await once(amqp, 'connect'); + await amqp.connect(); // Close the connection nicely amqplib.simulateRemoteBlock(); amqplib.simulateRemoteUnblock(); diff --git a/test/fixtures.ts b/test/fixtures.ts index e9e4ef6..ff75607 100644 --- a/test/fixtures.ts +++ b/test/fixtures.ts @@ -2,7 +2,7 @@ /* eslint-disable @typescript-eslint/explicit-module-boundary-types */ import { Connection, Message, Options, Replies } from 'amqplib'; -import { EventEmitter } from 'events'; +import { EventEmitter, once } from 'events'; import { IAmqpConnectionManager } from '../src/AmqpConnectionManager'; import ChannelWrapper, { CreateChannelOpts } from '../src/ChannelWrapper'; @@ -194,6 +194,15 @@ export class FakeAmqpConnectionManager extends EventEmitter implements IAmqpConn return 0; } + async connect(): Promise { + await Promise.all([once(this, 'connect'), this.simulateConnect()]); + } + + reconnect(): void { + this.simulateDisconnect(); + this.simulateConnect(); + } + isConnected() { return this.connected; } diff --git a/test/importTest.ts b/test/importTest.ts index 1a35464..d8f6035 100644 --- a/test/importTest.ts +++ b/test/importTest.ts @@ -1,9 +1,13 @@ import { expect } from 'chai'; -import amqp from '../src'; +import amqp, { AmqpConnectionManagerClass as AmqpConnectionManager } from '../src'; describe('import test', function () { it('should let you import as default (#51)', function () { expect(amqp).to.exist; expect(amqp.connect).to.exist; }); + + it('should let you import class', function () { + new AmqpConnectionManager('url'); + }); }); diff --git a/test/integrationTest.ts b/test/integrationTest.ts index ed589b9..6c02566 100644 --- a/test/integrationTest.ts +++ b/test/integrationTest.ts @@ -3,7 +3,7 @@ import chai from 'chai'; import chaiJest from 'chai-jest'; import pEvent from 'p-event'; import { defer, timeout } from 'promise-tools'; -import amqp from '../src'; +import amqp, { AmqpConnectionManagerClass as AmqpConnectionManager } from '../src'; import { IAmqpConnectionManager } from '../src/AmqpConnectionManager'; chai.use(chaiJest); @@ -69,6 +69,19 @@ describe('Integration tests', () => { await timeout(pEvent(connection, 'connect'), 3000); }); + // This test causes jest to complain about leaked resources due to the bug described and fixed by: + // https://github.com/squaremo/amqp.node/pull/584 + it('should throw on awaited connect with wrong password', async () => { + connection = new AmqpConnectionManager('amqp://guest:wrong@localhost'); + let err; + try { + await connection.connect(); + } catch (error) { + err = error; + } + expect(err.message).to.contain('ACCESS-REFUSED'); + }); + it('send and receive messages', async () => { const queueName = 'testQueue1'; const content = `hello world - ${Date.now()}`;