Skip to content

Commit

Permalink
Merge pull request jwalton#179 from luddd3/feat/reconnect-consumers
Browse files Browse the repository at this point in the history
feat: reconnect and cancelAll consumers
  • Loading branch information
jwalton authored Aug 27, 2021
2 parents b2c89ac + 9caf187 commit 8655a33
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 7 deletions.
109 changes: 105 additions & 4 deletions src/ChannelWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ interface SendToQueueMessage {
reject: (err: Error) => void;
}

interface ConsumerOptions extends amqplib.Options.Consume {
prefetch?: number
}

interface Consumer {
consumerTag: string | null;
queue: string;
onMessage: (msg: amqplib.ConsumeMessage) => void;
options: ConsumerOptions;
}

type Message = PublishMessage | SendToQueueMessage;

const IRRECOVERABLE_ERRORS = [
Expand Down Expand Up @@ -87,6 +98,8 @@ export default class ChannelWrapper extends EventEmitter {
private _unconfirmedMessages: Message[] = [];
/** Reason code during publish or sendtoqueue messages. */
private _irrecoverableCode: number | undefined;
/** Consumers which will be reconnected on channel errors etc. */
private _consumers: Consumer[] = [];

/**
* The currently connected channel. Note that not all setup functions
Expand Down Expand Up @@ -324,6 +337,8 @@ export default class ChannelWrapper extends EventEmitter {

// Array of setup functions to call.
this._setups = [];
this._consumers = [];

if (options.setup) {
this._setups.push(options.setup);
}
Expand Down Expand Up @@ -359,10 +374,13 @@ export default class ChannelWrapper extends EventEmitter {
this.emit('error', err, { name: this.name });
})
)
).then(() => {
this._settingUp = undefined;
});

)
.then(() => {
return Promise.all(this._consumers.map((c) => this._reconnectConsumer(c)));
})
.then(() => {
this._settingUp = undefined;
});
await this._settingUp;

if (!this._channel) {
Expand Down Expand Up @@ -581,6 +599,89 @@ export default class ChannelWrapper extends EventEmitter {
}
}

/**
* Setup a consumer
* This consumer will be reconnected on cancellation and channel errors.
*/
async consume(
queue: string,
onMessage: Consumer['onMessage'],
options: ConsumerOptions = {}
): Promise<void> {
const consumer: Consumer = {
consumerTag: null,
queue,
onMessage,
options,
};
this._consumers.push(consumer);
await this._consume(consumer);
}

private async _consume(consumer: Consumer): Promise<void> {
if (!this._channel) {
return;
}

const { prefetch, ...options } = consumer.options;
if (typeof prefetch === 'number') {
this._channel.prefetch(prefetch, false);
}

const { consumerTag } = await this._channel.consume(
consumer.queue,
(msg) => {
if (!msg) {
consumer.consumerTag = null;
this._reconnectConsumer(consumer).catch((err) => {
if (err.isOperational && err.message.includes('BasicConsume; 404')) {
// Ignore errors caused by queue not declared. In
// those cases the connection will reconnect and
// then consumers reestablished. The full reconnect
// might be avoided if we assert the queue again
// before starting to consume.
return;
}
throw err;
});
return;
}
consumer.onMessage(msg);
},
options
);
consumer.consumerTag = consumerTag;
}

private async _reconnectConsumer(consumer: Consumer): Promise<void> {
if (!this._consumers.includes(consumer)) {
// Intentionally canceled
return;
}
await this._consume(consumer);
}

/**
* Cancel all consumers
*/
async cancelAll(): Promise<void> {
const consumers = this._consumers;
this._consumers = [];
if (!this._channel) {
return;
}

const channel = this._channel;
await Promise.all(
consumers.reduce<any[]>((acc, consumer) => {
if (consumer.consumerTag) {
acc.push(channel.cancel(consumer.consumerTag));
}
return acc;
}, [])
);
}

/** Send an `ack` to the underlying channel. */
ack(message: amqplib.Message, allUpTo?: boolean): void {
this._channel && this._channel.ack(message, allUpTo);
Expand Down
178 changes: 175 additions & 3 deletions test/ChannelWrapperTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,7 @@ describe('ChannelWrapper', function () {
});
});

it('should publish messages to the underlying channel with callbacks', function (done: (
err?: Error
) => void) {
it('should publish messages to the underlying channel with callbacks', function (done) {
connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager);
channelWrapper.waitForConnect(function (err) {
Expand Down Expand Up @@ -970,6 +968,180 @@ describe('ChannelWrapper', function () {
// Final message should have been published to the underlying queue.
expect(queue.length).to.equal(2);
});

it('should consume messages', async function () {
let onMessage: any = null;

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.consume = jest.fn().mockImplementation((_queue, onMsg, _options) => {
onMessage = onMsg;
return Promise.resolve({ consumerTag: 'abc' });
});
},
});
await channelWrapper.waitForConnect();

const messages: any[] = [];
await channelWrapper.consume(
'queue',
(msg) => {
messages.push(msg);
},
{ noAck: true }
);

onMessage(1);
onMessage(2);
onMessage(3);
expect(messages).to.deep.equal([1, 2, 3]);
});

it('should reconnect consumer on consumer cancellation', async function () {
let onMessage: any = null;
let consumerTag = 0;

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.consume = jest.fn().mockImplementation((_queue, onMsg, _options) => {
onMessage = onMsg;
return Promise.resolve({ consumerTag: `${consumerTag++}` });
});
},
});
await channelWrapper.waitForConnect();

const messages: any[] = [];
await channelWrapper.consume('queue', (msg) => {
messages.push(msg);
});

onMessage(1);
onMessage(null); // simulate consumer cancel
onMessage(2);
onMessage(null); // simulate second cancel
onMessage(3);

expect(messages).to.deep.equal([1, 2, 3]);
expect(consumerTag).to.equal(3);
});

it('should reconnect consumers on channel error', async function () {
let onQueue1: any = null;
let onQueue2: any = null;
let consumerTag = 0;

// Define a prefetch function here, because it will otherwise be
// unique for each new channel
const prefetchFn = jest
.fn()
.mockImplementation((_prefetch: number, _isGlobal: boolean) => {});

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.prefetch = prefetchFn;
channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => {
if (queue === 'queue1') {
onQueue1 = onMsg;
} else {
onQueue2 = onMsg;
}
return Promise.resolve({ consumerTag: `${consumerTag++}` });
});
},
});
await channelWrapper.waitForConnect();

const queue1: any[] = [];
await channelWrapper.consume(
'queue1',
(msg) => {
queue1.push(msg);
},
{ noAck: true, prefetch: 10 },
);

const queue2: any[] = [];
await channelWrapper.consume('queue2', (msg) => {
queue2.push(msg);
});

onQueue1(1);
onQueue2(1);

connectionManager.simulateDisconnect();
connectionManager.simulateConnect();
await channelWrapper.waitForConnect();

onQueue1(2);
onQueue2(2);

expect(queue1).to.deep.equal([1, 2]);
expect(queue2).to.deep.equal([1, 2]);
expect(consumerTag).to.equal(4);
expect(prefetchFn).to.have.beenCalledTimes(2);
expect(prefetchFn).to.have.beenNthCalledWith(1, 10, false);
expect(prefetchFn).to.have.beenNthCalledWith(2, 10, false);
});

it('should be able to cancel all consumers', async function () {
let onQueue1: any = null;
let onQueue2: any = null;
let consumerTag = 0;
const canceledTags: number[] = [];

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => {
if (queue === 'queue1') {
onQueue1 = onMsg;
} else {
onQueue2 = onMsg;
}
return Promise.resolve({ consumerTag: `${consumerTag++}` });
});
channel.cancel = jest.fn().mockImplementation((consumerTag) => {
canceledTags.push(consumerTag);
if (consumerTag === '0') {
onQueue1(null);
} else if (consumerTag === '1') {
onQueue2(null);
}
return Promise.resolve();
});
},
});
await channelWrapper.waitForConnect();

const queue1: any[] = [];
await channelWrapper.consume('queue1', (msg) => {
queue1.push(msg);
});

const queue2: any[] = [];
await channelWrapper.consume('queue2', (msg) => {
queue2.push(msg);
});

onQueue1(1);
onQueue2(1);

await channelWrapper.cancelAll();

// Consumers shouldn't be resumed after reconnect when canceled
connectionManager.simulateDisconnect();
connectionManager.simulateConnect();
await channelWrapper.waitForConnect();

expect(queue1).to.deep.equal([1]);
expect(queue2).to.deep.equal([1]);
expect(consumerTag).to.equal(2);
expect(canceledTags).to.deep.equal(['0', '1']);
});
});

/** Returns the arguments of the most recent call to this mock. */
Expand Down
6 changes: 6 additions & 0 deletions test/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ export class FakeConfirmChannel extends EventEmitter {
close = jest.fn().mockImplementation(async (): Promise<void> => {
this.emit('close');
});

consume = jest.fn().mockImplementation(async (): Promise<Replies.Consume> => {
return { consumerTag: 'abc' };
});

prefetch = jest.fn().mockImplementation((_prefetch: number, _isGlobal: boolean): void => {});
}

export class FakeConnection extends EventEmitter {
Expand Down

0 comments on commit 8655a33

Please sign in to comment.