Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Move connection logic out of constructor
Browse files Browse the repository at this point in the history
Add setup method in AMQPConnector so that connection can be awaiting before subscribing to events.
RabbitEventBus's connect method is now public and should be called as part of setup
  • Loading branch information
norisalsaadie authored and erezmus committed Jan 22, 2020
1 parent ad1bb6a commit a7a735c
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 55 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@libero/event-bus",
"version": "0.1.4",
"version": "0.2.0",
"description": "",
"main": "dist/index.js",
"scripts": {
Expand Down
39 changes: 23 additions & 16 deletions src/rabbit-event-bus/amqp-connector.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import * as flushPromises from 'flush-promises';
import AMQPConnector from './amqp-connector';
import { StateChange } from './types';
import { channel } from 'rs-channel-node';
import { Event } from 'event-bus';
import { InfraLogger as logger } from '../logger';
import { EventUtils } from './event-utils';
import { Event } from '../event-bus/index';

jest.mock('amqplib');
jest.mock('../logger');
Expand Down Expand Up @@ -41,7 +41,8 @@ describe('AMQP connector', () => {

const sender = jest.fn();
const receiver = async (): Promise<StateChange> => ({} as StateChange);
new AMQPConnector(url, [sender, receiver], [], [], 'service');
const connector = new AMQPConnector(url, [sender, receiver], 'service');
await connector.setup([], []);

await flushPromises();
expect(connect).toHaveBeenCalledTimes(1);
Expand All @@ -56,7 +57,8 @@ describe('AMQP connector', () => {
const sender = jest.fn();
const receiver = async (): Promise<StateChange> => ({} as StateChange);

new AMQPConnector(url, [sender, receiver], [], [], 'service');
const connector = new AMQPConnector(url, [sender, receiver], 'service');
await connector.setup([], []);

await flushPromises();
expect(sender).toHaveBeenCalledTimes(1);
Expand All @@ -68,7 +70,8 @@ describe('AMQP connector', () => {
const mockConnection = makeConnection(mockChannel);

(connect as jest.Mock).mockImplementation(async (): Promise<Connection> => mockConnection);
new AMQPConnector(url, channel(), ['test:event'], [], 'service');
const connector = new AMQPConnector(url, channel(), 'service');
connector.setup(['test:event'], []);

await flushPromises();
expect(mockChannel.assertExchange).toHaveBeenCalledTimes(1);
Expand All @@ -81,7 +84,8 @@ describe('AMQP connector', () => {
const eventType = 'test:event';

(connect as jest.Mock).mockImplementation(async (): Promise<Connection> => mockConnection);
const conn = new AMQPConnector(url, channel(), [], [{ eventType, handler: jest.fn() }], 'service');
const connector = new AMQPConnector(url, channel(), 'service');
connector.setup([], [{ eventType, handler: jest.fn() }]);

await flushPromises();
expect(mockChannel.assertQueue).toHaveBeenCalledTimes(1);
Expand All @@ -94,8 +98,8 @@ describe('AMQP connector', () => {
);
expect(mockChannel.consume).toHaveBeenCalledTimes(1);
expect(mockChannel.consume.mock.calls[0][0]).toBe('consumer__test:event__service');
expect(conn.subscribedEvents).toHaveLength(1);
expect(conn.subscribedEvents[0]).toBe('test:event');
expect(connector.subscribedEvents).toHaveLength(1);
expect(connector.subscribedEvents[0]).toBe('test:event');
});
});

Expand All @@ -106,7 +110,8 @@ describe('AMQP connector', () => {

const sender = jest.fn();
const receiver = async (): Promise<StateChange> => ({} as StateChange);
const connector = new AMQPConnector(url, [sender, receiver], [], [], 'service');
const connector = new AMQPConnector(url, [sender, receiver], 'service');
await connector.setup([], []);

await flushPromises();
connector.destroy();
Expand All @@ -128,14 +133,12 @@ describe('AMQP connector', () => {
created: new Date(),
payload: { data: 'payload' },
};

(connect as jest.Mock).mockImplementation(async (): Promise<Connection> => mockConnection);
const connector = new AMQPConnector(url, channel(), ['test:event'], [], 'service');

const connector = new AMQPConnector(url, channel(), 'service');
connector.setup(['test:event'], []);
// we need to wait for connection to be stored before we can publish
await flushPromises();
await connector.publish(event as Event);

await flushPromises();
expect(mockChannel.publish).toHaveBeenCalledTimes(1);
expect(mockChannel.publish.mock.calls[0][0]).toBe('event__test:event');
Expand All @@ -161,7 +164,8 @@ describe('AMQP connector', () => {
const mockConnection = makeConnection(mockChannel);

(connect as jest.Mock).mockImplementation(async (): Promise<Connection> => mockConnection);
const connector = new AMQPConnector(url, channel(), [], [], 'service');
const connector = new AMQPConnector(url, channel(), 'service');
await connector.setup([], []);

await flushPromises();
await connector.subscribe('test:event', jest.fn());
Expand Down Expand Up @@ -190,7 +194,8 @@ describe('AMQP connector', () => {
const mockConnection = makeConnection(mockChannel);

(connect as jest.Mock).mockImplementation(async (): Promise<Connection> => mockConnection);
const connector = new AMQPConnector(url, channel(), [], [], 'service');
const connector = new AMQPConnector(url, channel(), 'service');
await connector.setup([], []);
const handler = jest.fn().mockImplementation(async () => Promise.resolve(true));

await flushPromises();
Expand Down Expand Up @@ -226,7 +231,8 @@ describe('AMQP connector', () => {
const mockConnection = makeConnection(mockChannel);

(connect as jest.Mock).mockImplementation(async (): Promise<Connection> => mockConnection);
const connector = new AMQPConnector(url, channel(), [], [], 'service');
const connector = new AMQPConnector(url, channel(), 'service');
await connector.setup([], []);
const handler = jest.fn().mockImplementation(async () => Promise.resolve());

await flushPromises();
Expand Down Expand Up @@ -266,7 +272,8 @@ describe('AMQP connector', () => {
const mockConnection = makeConnection(mockChannel);

(connect as jest.Mock).mockImplementation(async (): Promise<Connection> => mockConnection);
const connector = new AMQPConnector(url, channel(), [], [], 'service');
const connector = new AMQPConnector(url, channel(), 'service');
await connector.setup([], []);
const handler = jest.fn().mockImplementation(async () => Promise.resolve());

await flushPromises();
Expand Down
26 changes: 14 additions & 12 deletions src/rabbit-event-bus/amqp-connector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,17 @@ export default class AMQPConnector {
private destroyed = false;
private subscriptions: EventType[] = [];

public constructor(
url: string,
[sender]: Channel<StateChange>,
eventDefs: string[],
subscriptions: Array<Subscription>,
serviceName: string,
) {
public constructor(private readonly url: string, [sender]: Channel<StateChange>, serviceName: string) {
this.externalConnector = { send: sender };
this.serviceName = serviceName;
}

public async setup(eventDefs: string[], subscriptions: Array<Subscription>): Promise<void> {
// Set up the connections to the AMQP server
this.connect(url)
return this.connect(this.url)
.then(async connection => {
this.connection = connection;
this.setupExchanges(eventDefs, subscriptions);
await this.setupExchanges(eventDefs, subscriptions);
})
.catch(() => {
// notify the manager object that the connection has failed
Expand Down Expand Up @@ -64,7 +60,6 @@ export default class AMQPConnector {
const exName = EventUtils.makeEventExchangeName(eventType);

await rabbitChannel.bindQueue(qName, exName, '');
logger.trace('subscribe');
this.subscriptions.push(eventType);

await rabbitChannel.consume(qName, async (msg: Message) => {
Expand All @@ -75,9 +70,16 @@ export default class AMQPConnector {
logger.fatal(`Can't create subscriber queues for: ${this.serviceName} using event: ${eventType}`);
});
} else {
const retryIn = 1000;
const hostname = this.url.indexOf('@') > -1 ? this.url.split('@')[1] : this.url;
logger.warn(`No connection, can't subscribe to ${hostname}. Trying again in ${retryIn} milliseconds`);
// Do we want to handle reconnects &/or retries here?
setTimeout(() => this.subscribe(eventType, handler), 1000);
logger.warn("No connection, can't subscribe, trying again soon!");
return new Promise(resolve =>
setTimeout(async () => {
await this.subscribe(eventType, handler);
resolve();
}, retryIn),
);
}
}

Expand Down
40 changes: 25 additions & 15 deletions src/rabbit-event-bus/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ describe('AMQP Connection Manager', () => {
describe('destroy', () => {
it('calls connector destroy method', async () => {
const destroyMock = jest.fn();
(AMQPConnector as jest.Mock).mockImplementation(() => ({ destroy: destroyMock }));
const manager = await new RabbitEventBus({ url: '' }, [], '');

const setupMock = jest.fn();
(AMQPConnector as jest.Mock).mockImplementation(() => ({ destroy: destroyMock, setup: setupMock }));
const manager = new RabbitEventBus({ url: '' }, [], '');
await manager.connect();
await manager.destroy();

expect(destroyMock).toHaveBeenCalledTimes(1);
Expand All @@ -23,16 +24,18 @@ describe('AMQP Connection Manager', () => {
describe('behaviour in a good connection state', () => {
it('forwards messages to a connector', async () => {
const publishMock = jest.fn(async () => true);
(AMQPConnector as jest.Mock).mockImplementation((__, [send]: Channel<StateChange>) => {
(AMQPConnector as jest.Mock).mockImplementation((_, [send]: Channel<StateChange>) => {
send({
newState: 'CONNECTED',
});
return {
setup: jest.fn(),
publish: publishMock,
subscribe: jest.fn(),
};
});
const manager = await new RabbitEventBus({ url: '' }, [], '');
await manager.connect();
await manager.publish({
eventType: 'test',
id: 'something',
Expand All @@ -46,20 +49,22 @@ describe('AMQP Connection Manager', () => {
it("passes on subscribes to the connector immediately, while it's ready", async () => {
const subscribeMock = jest.fn();
const [readyNotify, readyWait] = channel<{}>();
(AMQPConnector as jest.Mock).mockImplementation((__, [send]: Channel<StateChange>) => {
(AMQPConnector as jest.Mock).mockImplementation((_, [send]: Channel<StateChange>) => {
send({
newState: 'CONNECTED',
});

readyNotify({});
return {
publish: jest.fn(),
setup: jest.fn(),
subscribe: subscribeMock,
};
});

const manager = await new RabbitEventBus({ url: '' }, [], '');

await manager.connect();
await manager.subscribe('test', jest.fn());

await readyWait();
Expand All @@ -72,7 +77,7 @@ describe('AMQP Connection Manager', () => {
// This channel is used to simulate startup delay in the connector
const [readyNotify, readyWait] = channel<{}>();

(AMQPConnector as jest.Mock).mockImplementation((___, [send]: Channel<StateChange>, __, subscriptions) => {
(AMQPConnector as jest.Mock).mockImplementation((_, [send]: Channel<StateChange>) => {
send({
newState: 'CONNECTED',
});
Expand All @@ -82,13 +87,14 @@ describe('AMQP Connection Manager', () => {
});
});
return {
subscriptions,
setup: jest.fn(),
publish: publishMock,
subscribe: jest.fn(),
};
});

const manager = await new RabbitEventBus({ url: '' }, [], '');
await manager.connect();

Promise.all([
manager.publish({
Expand Down Expand Up @@ -129,7 +135,7 @@ describe('AMQP Connection Manager', () => {
// This channel is used to simulate startup delay in the connector
const [readyNotify, readyWait] = channel<{}>();

(AMQPConnector as jest.Mock).mockImplementation((__, [send]: Channel<StateChange>, _2, subscriptions) => {
(AMQPConnector as jest.Mock).mockImplementation((_, [send]: Channel<StateChange>) => {
send({
newState: 'CONNECTED',
});
Expand All @@ -139,14 +145,15 @@ describe('AMQP Connection Manager', () => {
});
});
return {
subscriptions,
connect: connectMock,
publish: jest.fn(),
setup: jest.fn(),
subscribe: subscribeMock,
};
});

const manager = await new RabbitEventBus({ url: '' }, [], '');
await manager.connect();

await manager.subscribe('test', jest.fn());
await manager.subscribe('test', jest.fn());
Expand All @@ -173,21 +180,22 @@ describe('AMQP Connection Manager', () => {

(AMQPConnector as jest.Mock).mockImplementation(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
(_0, [send, _1]: Channel<StateChange>, _2, subscriptions) => {
(_, [send, _1]: Channel<StateChange>) => {
send({
newState: 'NOT_CONNECTED',
});

return {
subscriptions,
connect: connectMock,
publish: jest.fn(() => false),
setup: jest.fn(),
subscribe: subscribeMock,
};
},
);

const manager = await new RabbitEventBus({ url: '' }, [], '');
await manager.connect();
const then = jest.fn();

manager
Expand Down Expand Up @@ -226,19 +234,20 @@ describe('AMQP Connection Manager', () => {
it('publish promises are resolved after a successful connection', async done => {
const subscribeMock = jest.fn();

(AMQPConnector as jest.Mock).mockImplementation((__, [send]: Channel<StateChange>, _2, subscriptions) => {
(AMQPConnector as jest.Mock).mockImplementation((_, [send]: Channel<StateChange>) => {
send({
newState: 'CONNECTED',
});

return {
subscriptions,
publish: jest.fn(() => true),
subscribe: subscribeMock,
setup: jest.fn(),
};
});

const manager = await new RabbitEventBus({ url: '' }, [], '');
await manager.connect();
const then = jest.fn();

manager
Expand Down Expand Up @@ -280,20 +289,21 @@ describe('AMQP Connection Manager', () => {
let returnState: ConnectedState = 'NOT_CONNECTED';
let returnPublish = false;

(AMQPConnector as jest.Mock).mockImplementation((__, [send]: Channel<StateChange>, _2, subscriptions) => {
(AMQPConnector as jest.Mock).mockImplementation((_, [send]: Channel<StateChange>) => {
send({
newState: returnState,
});

return {
subscriptions,
connect: connectMock,
publish: jest.fn(() => returnPublish),
subscribe: subscribeMock,
setup: jest.fn(),
};
});

const manager = await new RabbitEventBus({ url: '' }, [], '');
manager.connect();
const then = jest.fn();

manager
Expand Down
Loading

0 comments on commit a7a735c

Please sign in to comment.