Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose AmqpConnectionManagerClass #181

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# [3.6.0](https://github.com/jwalton/node-amqp-connection-manager/compare/v3.5.2...v3.6.0) (2021-08-27)

### Features

- reconnect and cancelAll consumers ([fb0c00b](https://github.com/jwalton/node-amqp-connection-manager/commit/fb0c00becc224ffedd28e810cbb314187d21efdb))

## [3.5.2](https://github.com/jwalton/node-amqp-connection-manager/compare/v3.5.1...v3.5.2) (2021-08-26)

### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "amqp-connection-manager",
"version": "3.5.2",
"version": "3.6.0",
"description": "Auto-reconnect and round robin support for amqplib.",
"module": "./dist/esm/index.js",
"main": "./dist/cjs/index.js",
Expand Down
41 changes: 40 additions & 1 deletion src/AmqpConnectionManager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import amqp, { Connection } from 'amqplib';
import { EventEmitter } from 'events';
import { EventEmitter, once } from 'events';
import { TcpSocketConnectOpts } from 'net';
import pb from 'promise-breaker';
import { ConnectionOptions } from 'tls';
Expand Down Expand Up @@ -82,6 +82,8 @@ export interface IAmqpConnectionManager {
addListener(event: 'unblocked', listener: () => void): this;
addListener(event: 'disconnect', listener: (arg: { err: Error }) => void): this;

listeners(eventName: string | symbol): any;

on(event: string, listener: (...args: any[]) => void): this;
on(event: 'connect', listener: ConnectListener): this;
on(event: 'blocked', listener: (arg: { reason: string }) => void): this;
Expand All @@ -108,6 +110,8 @@ export interface IAmqpConnectionManager {

removeListener(event: string, listener: (...args: any[]) => void): this;

connect(options?: { timeout?: number }): Promise<void>;
reconnect(): void;
createChannel(options?: CreateChannelOpts): ChannelWrapper;
close(): Promise<void>;
isConnected(): boolean;
Expand Down Expand Up @@ -196,8 +200,43 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
this.setMaxListeners(0);

this._findServers = options.findServers || (() => Promise.resolve(urls));
}

/**
* Start the connect retries and await the first connect result. Even if the initial connect fails or timeouts, the
* reconnect attempts will continue in the background.
* @param [options={}] -
* @param [options.timeout] - Time to wait for initial connect
*/
async connect({ timeout }: { timeout?: number } = {}): Promise<void> {
this._connect();

let reject: (reason?: any) => void;
const onDisconnect = ({ err }: { err: any }) => {
// Ignore disconnects caused by dead servers etc., but throw on operational errors like bad credentials.
if (err.isOperational) {
reject(err);
}
};

try {
await Promise.race([
once(this, 'connect'),
new Promise((_resolve, innerReject) => {
reject = innerReject;
this.on('disconnect', onDisconnect);
}),
...(timeout
? [
wait(timeout).promise.then(() => {
throw new Error('amqp-connection-manager: connect timeout');
}),
]
: []),
]);
} finally {
this.removeListener('disconnect', onDisconnect);
}
}

// `options` here are any options that can be passed to ChannelWrapper.
Expand Down
109 changes: 105 additions & 4 deletions src/ChannelWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ interface SendToQueueMessage {
reject: (err: Error) => void;
}

interface ConsumerOptions extends amqplib.Options.Consume {
prefetch?: number
}

interface Consumer {
consumerTag: string | null;
queue: string;
onMessage: (msg: amqplib.ConsumeMessage) => void;
options: ConsumerOptions;
}

type Message = PublishMessage | SendToQueueMessage;

const IRRECOVERABLE_ERRORS = [
Expand Down Expand Up @@ -87,6 +98,8 @@ export default class ChannelWrapper extends EventEmitter {
private _unconfirmedMessages: Message[] = [];
/** Reason code during publish or sendtoqueue messages. */
private _irrecoverableCode: number | undefined;
/** Consumers which will be reconnected on channel errors etc. */
private _consumers: Consumer[] = [];

/**
* The currently connected channel. Note that not all setup functions
Expand Down Expand Up @@ -324,6 +337,8 @@ export default class ChannelWrapper extends EventEmitter {

// Array of setup functions to call.
this._setups = [];
this._consumers = [];

if (options.setup) {
this._setups.push(options.setup);
}
Expand Down Expand Up @@ -359,10 +374,13 @@ export default class ChannelWrapper extends EventEmitter {
this.emit('error', err, { name: this.name });
})
)
).then(() => {
this._settingUp = undefined;
});

)
.then(() => {
return Promise.all(this._consumers.map((c) => this._reconnectConsumer(c)));
})
.then(() => {
this._settingUp = undefined;
});
await this._settingUp;

if (!this._channel) {
Expand Down Expand Up @@ -581,6 +599,89 @@ export default class ChannelWrapper extends EventEmitter {
}
}

/**
* Setup a consumer
* This consumer will be reconnected on cancellation and channel errors.
*/
async consume(
queue: string,
onMessage: Consumer['onMessage'],
options: ConsumerOptions = {}
): Promise<void> {
const consumer: Consumer = {
consumerTag: null,
queue,
onMessage,
options,
};
this._consumers.push(consumer);
await this._consume(consumer);
}

private async _consume(consumer: Consumer): Promise<void> {
if (!this._channel) {
return;
}

const { prefetch, ...options } = consumer.options;
if (typeof prefetch === 'number') {
this._channel.prefetch(prefetch, false);
}

const { consumerTag } = await this._channel.consume(
consumer.queue,
(msg) => {
if (!msg) {
consumer.consumerTag = null;
this._reconnectConsumer(consumer).catch((err) => {
if (err.isOperational && err.message.includes('BasicConsume; 404')) {
// Ignore errors caused by queue not declared. In
// those cases the connection will reconnect and
// then consumers reestablished. The full reconnect
// might be avoided if we assert the queue again
// before starting to consume.
return;
}
throw err;
});
return;
}
consumer.onMessage(msg);
},
options
);
consumer.consumerTag = consumerTag;
}

private async _reconnectConsumer(consumer: Consumer): Promise<void> {
if (!this._consumers.includes(consumer)) {
// Intentionally canceled
return;
}
await this._consume(consumer);
}

/**
* Cancel all consumers
*/
async cancelAll(): Promise<void> {
const consumers = this._consumers;
this._consumers = [];
if (!this._channel) {
return;
}

const channel = this._channel;
await Promise.all(
consumers.reduce<any[]>((acc, consumer) => {
if (consumer.consumerTag) {
acc.push(channel.cancel(consumer.consumerTag));
}
return acc;
}, [])
);
}

/** Send an `ack` to the underlying channel. */
ack(message: amqplib.Message, allUpTo?: boolean): void {
this._channel && this._channel.ack(message, allUpTo);
Expand Down
8 changes: 7 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@ export function connect(
urls: ConnectionUrl | ConnectionUrl[] | undefined | null,
options?: AmqpConnectionManagerOptions
): IAmqpConnectionManager {
return new AmqpConnectionManager(urls, options);
const conn = new AmqpConnectionManager(urls, options);
conn.connect().catch(() => {
/* noop */
});
return conn;
}

export { AmqpConnectionManager as AmqpConnectionManagerClass };

const amqp = { connect };

export default amqp;
Loading