From 823fbab75dfc302e4727f2ad4bd2dad179c15704 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Thu, 30 Jan 2025 13:52:50 +0100 Subject: [PATCH 1/6] feat(microservices): add support for topic exchange (rabbitmq) --- .../e2e/topic-exchange-rmq.spec.ts | 38 +++++++ .../src/rmq/topic-exchange-rmq.controller.ts | 36 ++++++ packages/microservices/client/client-rmq.ts | 107 +++++++++++------- .../microservice-configuration.interface.ts | 7 ++ packages/microservices/server/server-rmq.ts | 90 +++++++++++++-- tools/gulp/tasks/move.ts | 6 +- 6 files changed, 227 insertions(+), 57 deletions(-) create mode 100644 integration/microservices/e2e/topic-exchange-rmq.spec.ts create mode 100644 integration/microservices/src/rmq/topic-exchange-rmq.controller.ts diff --git a/integration/microservices/e2e/topic-exchange-rmq.spec.ts b/integration/microservices/e2e/topic-exchange-rmq.spec.ts new file mode 100644 index 00000000000..ac64ccd68cc --- /dev/null +++ b/integration/microservices/e2e/topic-exchange-rmq.spec.ts @@ -0,0 +1,38 @@ +import { INestApplication } from '@nestjs/common'; +import { MicroserviceOptions, Transport } from '@nestjs/microservices'; +import { Test } from '@nestjs/testing'; +import * as request from 'supertest'; +import { RMQTopicExchangeController } from '../src/rmq/topic-exchange-rmq.controller'; + +describe('RabbitMQ transport (Topic Exchange)', () => { + let server: any; + let app: INestApplication; + + beforeEach(async () => { + const module = await Test.createTestingModule({ + controllers: [RMQTopicExchangeController], + }).compile(); + + app = module.createNestApplication(); + server = app.getHttpAdapter().getInstance(); + + app.connectMicroservice({ + transport: Transport.RMQ, + options: { + urls: [`amqp://0.0.0.0:5672`], + queue: 'test', + topicExchange: 'test', + }, + }); + await app.startAllMicroservices(); + await app.init(); + }); + + it(`should send message to wildcard topic exchange`, () => { + return request(server).get('/topic-exchange').expect(200, 'wildcard.a.b'); + }); + + afterEach(async () => { + await app.close(); + }); +}); diff --git a/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts b/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts new file mode 100644 index 00000000000..53474a10c98 --- /dev/null +++ b/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts @@ -0,0 +1,36 @@ +import { Controller, Get } from '@nestjs/common'; +import { + ClientProxy, + ClientProxyFactory, + Ctx, + MessagePattern, + RmqContext, + Transport, +} from '@nestjs/microservices'; +import { lastValueFrom } from 'rxjs'; + +@Controller() +export class RMQTopicExchangeController { + client: ClientProxy; + + constructor() { + this.client = ClientProxyFactory.create({ + transport: Transport.RMQ, + options: { + urls: [`amqp://localhost:5672`], + queue: 'test', + topicExchange: 'test', + }, + }); + } + + @Get('topic-exchange') + async topicExchange() { + return lastValueFrom(this.client.send('wildcard.a.b', 1)); + } + + @MessagePattern('wildcard.*.*') + handleTopicExchange(@Ctx() ctx: RmqContext): string { + return ctx.getPattern(); + } +} diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index fa75d0fc33c..15b7ae88fbd 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -1,7 +1,7 @@ import { Logger } from '@nestjs/common/services/logger.service'; import { loadPackage } from '@nestjs/common/utils/load-package.util'; import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util'; -import { isFunction } from '@nestjs/common/utils/shared.utils'; +import { isFunction, isString } from '@nestjs/common/utils/shared.utils'; import { EventEmitter } from 'events'; import { EmptyError, @@ -55,8 +55,8 @@ export class ClientRMQ extends ClientProxy { protected readonly logger = new Logger(ClientProxy.name); protected connection$: ReplaySubject; protected connectionPromise: Promise; - protected client: AmqpConnectionManager = null; - protected channel: ChannelWrapper = null; + protected client: AmqpConnectionManager | null = null; + protected channel: ChannelWrapper | null = null; protected pendingEventListeners: Array<{ event: keyof RmqEvents; callback: RmqEvents[keyof RmqEvents]; @@ -113,7 +113,7 @@ export class ClientRMQ extends ClientProxy { this.registerDisconnectListener(this.client); this.registerConnectListener(this.client); this.pendingEventListeners.forEach(({ event, callback }) => - this.client.on(event, callback), + this.client!.on(event, callback), ); this.pendingEventListeners = []; @@ -140,7 +140,7 @@ export class ClientRMQ extends ClientProxy { public createChannel(): Promise { return new Promise(resolve => { - this.channel = this.client.createChannel({ + this.channel = this.client!.createChannel({ json: false, setup: (channel: Channel) => this.setupChannel(channel, resolve), }); @@ -224,8 +224,8 @@ export class ClientRMQ extends ClientProxy { const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK); await channel.consume( this.replyQueue, - (msg: ConsumeMessage) => - this.responseEmitter.emit(msg.properties.correlationId, msg), + (msg: ConsumeMessage | null) => + this.responseEmitter.emit(msg!.properties.correlationId, msg), { noAck, }, @@ -359,23 +359,35 @@ export class ClientRMQ extends ClientProxy { delete serializedPacket.options; this.responseEmitter.on(correlationId, listener); - this.channel - .sendToQueue( - this.queue, - Buffer.from(JSON.stringify(serializedPacket)), - { - replyTo: this.replyQueue, - persistent: this.getOptionsProp( - this.options, - 'persistent', - RQM_DEFAULT_PERSISTENT, - ), - ...options, - headers: this.mergeHeaders(options?.headers), - correlationId, - }, - ) - .catch(err => callback({ err })); + + const content = Buffer.from(JSON.stringify(serializedPacket)); + const sendOptions = { + replyTo: this.replyQueue, + persistent: this.getOptionsProp( + this.options, + 'persistent', + RQM_DEFAULT_PERSISTENT, + ), + ...options, + headers: this.mergeHeaders(options?.headers), + correlationId, + }; + + if (this.options.topicExchange) { + const stringifiedPattern = isString(message.pattern) + ? message.pattern + : JSON.stringify(message.pattern); + this.channel!.publish( + this.options.topicExchange, + stringifiedPattern, + content, + sendOptions, + ).catch(err => callback({ err })); + } else { + this.channel!.sendToQueue(this.queue, content, sendOptions).catch(err => + callback({ err }), + ); + } return () => this.responseEmitter.removeListener(correlationId, listener); } catch (err) { callback({ err }); @@ -390,22 +402,37 @@ export class ClientRMQ extends ClientProxy { const options = serializedPacket.options; delete serializedPacket.options; - return new Promise((resolve, reject) => - this.channel.sendToQueue( - this.queue, - Buffer.from(JSON.stringify(serializedPacket)), - { - persistent: this.getOptionsProp( - this.options, - 'persistent', - RQM_DEFAULT_PERSISTENT, - ), - ...options, - headers: this.mergeHeaders(options?.headers), - }, - (err: unknown) => (err ? reject(err as Error) : resolve()), - ), - ); + return new Promise((resolve, reject) => { + const content = Buffer.from(JSON.stringify(serializedPacket)); + const sendOptions = { + persistent: this.getOptionsProp( + this.options, + 'persistent', + RQM_DEFAULT_PERSISTENT, + ), + ...options, + headers: this.mergeHeaders(options?.headers), + }; + const errorCallback = (err: unknown) => + err ? reject(err as Error) : resolve(); + + return this.options.topicExchange + ? this.channel!.publish( + this.options.topicExchange, + isString(packet.pattern) + ? packet.pattern + : JSON.stringify(packet.pattern), + content, + sendOptions, + errorCallback, + ) + : this.channel!.sendToQueue( + this.queue, + content, + sendOptions, + errorCallback, + ); + }); } protected initializeSerializer(options: RmqOptions['options']) { diff --git a/packages/microservices/interfaces/microservice-configuration.interface.ts b/packages/microservices/interfaces/microservice-configuration.interface.ts index 0c371076bae..079dcb4ef66 100644 --- a/packages/microservices/interfaces/microservice-configuration.interface.ts +++ b/packages/microservices/interfaces/microservice-configuration.interface.ts @@ -231,6 +231,13 @@ export interface RmqOptions { persistent?: boolean; headers?: Record; noAssert?: boolean; + /** + * Set only if you want to use Topic Exchange for routing messages to queues. + * Enabling this will allow you to use wildcards (*, #) as message and event patterns. + * Topic exchange can have any arbitrary name, but it should be the same for the producer (client) and consumer (server). + * @see https://www.rabbitmq.com/tutorials/tutorial-five-python#topic-exchange + */ + topicExchange?: string; /** * Maximum number of connection attempts. * Applies only to the consumer configuration. diff --git a/packages/microservices/server/server-rmq.ts b/packages/microservices/server/server-rmq.ts index 20fdea33e31..068714d73d5 100644 --- a/packages/microservices/server/server-rmq.ts +++ b/packages/microservices/server/server-rmq.ts @@ -21,7 +21,7 @@ import { RmqContext } from '../ctx-host'; import { Transport } from '../enums'; import { RmqEvents, RmqEventsMap, RmqStatus } from '../events/rmq.events'; import { RmqUrl } from '../external/rmq-url.interface'; -import { RmqOptions } from '../interfaces'; +import { MessageHandler, RmqOptions } from '../interfaces'; import { IncomingRequest, OutgoingResponse, @@ -53,13 +53,14 @@ const INFINITE_CONNECTION_ATTEMPTS = -1; export class ServerRMQ extends Server { public readonly transportId = Transport.RMQ; - protected server: AmqpConnectionManager = null; - protected channel: ChannelWrapper = null; + protected server: AmqpConnectionManager | null = null; + protected channel: ChannelWrapper | null = null; protected connectionAttempts = 0; protected readonly urls: string[] | RmqUrl[]; protected readonly queue: string; protected readonly noAck: boolean; protected readonly queueOptions: any; + protected readonly wildcardHandlers = new Map(); protected pendingEventListeners: Array<{ event: keyof RmqEvents; callback: RmqEvents[keyof RmqEvents]; @@ -106,12 +107,12 @@ export class ServerRMQ extends Server { callback?: (err?: unknown, ...optionalParams: unknown[]) => void, ) { this.server = this.createClient(); - this.server.once(RmqEventsMap.CONNECT, () => { + this.server!.once(RmqEventsMap.CONNECT, () => { if (this.channel) { return; } this._status$.next(RmqStatus.CONNECTED); - this.channel = this.server.createChannel({ + this.channel = this.server!.createChannel({ json: false, setup: (channel: any) => this.setupChannel(channel, callback!), }); @@ -126,12 +127,12 @@ export class ServerRMQ extends Server { this.registerConnectListener(); this.registerDisconnectListener(); this.pendingEventListeners.forEach(({ event, callback }) => - this.server.on(event, callback), + this.server!.on(event, callback), ); this.pendingEventListeners = []; const connectFailedEvent = 'connectFailed'; - this.server.once(connectFailedEvent, (error: Record) => { + this.server!.once(connectFailedEvent, (error: Record) => { this._status$.next(RmqStatus.DISCONNECTED); this.logger.error(CONNECTION_FAILED_MESSAGE); @@ -162,13 +163,13 @@ export class ServerRMQ extends Server { } private registerConnectListener() { - this.server.on(RmqEventsMap.CONNECT, (err: any) => { + this.server!.on(RmqEventsMap.CONNECT, (err: any) => { this._status$.next(RmqStatus.CONNECTED); }); } private registerDisconnectListener() { - this.server.on(RmqEventsMap.DISCONNECT, (err: any) => { + this.server!.on(RmqEventsMap.DISCONNECT, (err: any) => { this._status$.next(RmqStatus.DISCONNECTED); this.logger.error(DISCONNECTED_RMQ_MESSAGE); this.logger.error(err); @@ -207,6 +208,21 @@ export class ServerRMQ extends Server { ); } + // When "Topic exchange" is used, we need to bind the queue to the exchange + // with all the routing keys used by the handlers + if (this.options.topicExchange) { + const routingKeys = Array.from(this.getHandlers().keys()); + await Promise.all( + routingKeys.map(routingKey => + channel.bindQueue(this.queue, this.options.topicExchange, routingKey), + ), + ); + + // "Topic exchange" supports wildcards, so we need to initialize wildcard handlers + // otherwise we would not be able to associate the incoming messages with the handlers + this.initializeWildcardHandlersIfExist(); + } + await channel.prefetch(prefetchCount, isGlobalPrefetchCount); channel.consume( this.queue, @@ -246,7 +262,7 @@ export class ServerRMQ extends Server { if (!handler) { if (!this.noAck) { this.logger.warn(RQM_NO_MESSAGE_HANDLER`${pattern}`); - this.channel.nack(rmqContext.getMessage() as Message, false, false); + this.channel!.nack(rmqContext.getMessage() as Message, false, false); } const status = 'error'; const noHandlerPacket = { @@ -277,7 +293,7 @@ export class ServerRMQ extends Server { ): Promise { const handler = this.getHandlerByPattern(pattern); if (!handler && !this.noAck) { - this.channel.nack(context.getMessage() as Message, false, false); + this.channel!.nack(context.getMessage() as Message, false, false); return this.logger.warn(RQM_NO_EVENT_HANDLER`${pattern}`); } return super.handleEvent(pattern, packet, context); @@ -295,7 +311,8 @@ export class ServerRMQ extends Server { delete outgoingResponse.options; const buffer = Buffer.from(JSON.stringify(outgoingResponse)); - this.channel.sendToQueue(replyTo, buffer, { correlationId, ...options }); + const sendOptions = { correlationId, ...options }; + this.channel!.sendToQueue(replyTo, buffer, sendOptions); } public unwrap(): T { @@ -318,6 +335,31 @@ export class ServerRMQ extends Server { } } + public getHandlerByPattern(pattern: string): MessageHandler | null { + if (!this.options.topicExchange) { + // When "Topic exchange" is not used, wildcards are not supported + // so we can fallback to the default behavior + return super.getHandlerByPattern(pattern); + } + + // Search for non-wildcard handler first + const handler = super.getHandlerByPattern(pattern); + if (handler) { + return handler; + } + + // Search for wildcard handler + if (this.wildcardHandlers.size === 0) { + return null; + } + for (const [regex, handler] of this.wildcardHandlers) { + if (regex.test(pattern)) { + return handler; + } + } + return null; + } + protected initializeSerializer(options: RmqOptions['options']) { this.serializer = options?.serializer ?? new RmqRecordSerializer(); } @@ -329,4 +371,28 @@ export class ServerRMQ extends Server { return content.toString(); } } + + private initializeWildcardHandlersIfExist() { + if (this.wildcardHandlers.size !== 0) { + return; + } + const handlers = this.getHandlers(); + + handlers.forEach((handler, pattern) => { + const regex = this.convertRoutingKeyToRegex(pattern); + if (regex) { + this.wildcardHandlers.set(regex, handler); + } + }); + } + + private convertRoutingKeyToRegex(routingKey: string): RegExp | undefined { + if (!routingKey.includes('#') && !routingKey.includes('*')) { + return; + } + let regexPattern = routingKey.replace(/\./g, '\\.'); + regexPattern = regexPattern.replace(/\*/g, '[^.]+'); + regexPattern = regexPattern.replace(/#/g, '.*'); + return new RegExp(`^${regexPattern}$`); + } } diff --git a/tools/gulp/tasks/move.ts b/tools/gulp/tasks/move.ts index 0991e1e6f84..666901c39a1 100644 --- a/tools/gulp/tasks/move.ts +++ b/tools/gulp/tasks/move.ts @@ -3,11 +3,7 @@ import { join } from 'path'; import { samplePath } from '../config'; import { containsPackageJson, getDirs } from '../util/task-helpers'; -const distFiles = src([ - 'packages/**/*', - '!packages/**/*.ts', - 'packages/**/*.d.ts', -]); +const distFiles = src(['packages/**/*.js', 'packages/**/*.d.ts']); /** * Moves the compiled nest files into "node_module" folder. From daae5e765af83cd24c94c4a48b964fcac531b706 Mon Sep 17 00:00:00 2001 From: Kamil Mysliwiec Date: Thu, 30 Jan 2025 13:58:32 +0100 Subject: [PATCH 2/6] Potential fix for code scanning alert no. 39: Incomplete string escaping or encoding Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> --- packages/microservices/server/server-rmq.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/microservices/server/server-rmq.ts b/packages/microservices/server/server-rmq.ts index 068714d73d5..13784e558e3 100644 --- a/packages/microservices/server/server-rmq.ts +++ b/packages/microservices/server/server-rmq.ts @@ -390,7 +390,7 @@ export class ServerRMQ extends Server { if (!routingKey.includes('#') && !routingKey.includes('*')) { return; } - let regexPattern = routingKey.replace(/\./g, '\\.'); + let regexPattern = routingKey.replace(/\\/g, '\\\\').replace(/\./g, '\\.'); regexPattern = regexPattern.replace(/\*/g, '[^.]+'); regexPattern = regexPattern.replace(/#/g, '.*'); return new RegExp(`^${regexPattern}$`); From e1a9f5f0f3231dbe323067655582ff24d4d37d66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Thu, 30 Jan 2025 14:57:15 +0100 Subject: [PATCH 3/6] test: increase timeout for the wildcard topic exchange test --- integration/microservices/e2e/topic-exchange-rmq.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/microservices/e2e/topic-exchange-rmq.spec.ts b/integration/microservices/e2e/topic-exchange-rmq.spec.ts index ac64ccd68cc..22dc2ab628e 100644 --- a/integration/microservices/e2e/topic-exchange-rmq.spec.ts +++ b/integration/microservices/e2e/topic-exchange-rmq.spec.ts @@ -30,7 +30,7 @@ describe('RabbitMQ transport (Topic Exchange)', () => { it(`should send message to wildcard topic exchange`, () => { return request(server).get('/topic-exchange').expect(200, 'wildcard.a.b'); - }); + }).timeout(10000); afterEach(async () => { await app.close(); From f2e9f9367be5b92b9114177df2cd543dbebd2851 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Thu, 30 Jan 2025 14:57:39 +0100 Subject: [PATCH 4/6] chore: disable lint rule for rmq producer and consumer --- packages/microservices/client/client-rmq.ts | 1 + packages/microservices/server/server-rmq.ts | 1 + 2 files changed, 2 insertions(+) diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index 15b7ae88fbd..29f3dbb88b0 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -1,3 +1,4 @@ +/* eslint-disable @typescript-eslint/no-redundant-type-constituents */ import { Logger } from '@nestjs/common/services/logger.service'; import { loadPackage } from '@nestjs/common/utils/load-package.util'; import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util'; diff --git a/packages/microservices/server/server-rmq.ts b/packages/microservices/server/server-rmq.ts index 068714d73d5..52fec5500c0 100644 --- a/packages/microservices/server/server-rmq.ts +++ b/packages/microservices/server/server-rmq.ts @@ -1,3 +1,4 @@ +/* eslint-disable @typescript-eslint/no-redundant-type-constituents */ import { isNil, isString, From 23e76fd29675488633c0fbd9c801ef0d0fa4584a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Thu, 30 Jan 2025 15:31:02 +0100 Subject: [PATCH 5/6] test: use unique queue name for rmq --- integration/microservices/e2e/topic-exchange-rmq.spec.ts | 2 +- .../microservices/src/rmq/topic-exchange-rmq.controller.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integration/microservices/e2e/topic-exchange-rmq.spec.ts b/integration/microservices/e2e/topic-exchange-rmq.spec.ts index 22dc2ab628e..61234475f9e 100644 --- a/integration/microservices/e2e/topic-exchange-rmq.spec.ts +++ b/integration/microservices/e2e/topic-exchange-rmq.spec.ts @@ -20,7 +20,7 @@ describe('RabbitMQ transport (Topic Exchange)', () => { transport: Transport.RMQ, options: { urls: [`amqp://0.0.0.0:5672`], - queue: 'test', + queue: 'test2', topicExchange: 'test', }, }); diff --git a/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts b/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts index 53474a10c98..0de76b27111 100644 --- a/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts +++ b/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts @@ -18,7 +18,7 @@ export class RMQTopicExchangeController { transport: Transport.RMQ, options: { urls: [`amqp://localhost:5672`], - queue: 'test', + queue: 'test2', topicExchange: 'test', }, }); From d2948d952268cdc76d43f5df3ce7bd71daf44740 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Fri, 31 Jan 2025 08:50:10 +0100 Subject: [PATCH 6/6] refactor: introduce wildcards attribute, use existing exchange --- .../e2e/topic-exchange-rmq.spec.ts | 6 +- .../src/rmq/topic-exchange-rmq.controller.ts | 2 +- packages/microservices/client/client-rmq.ts | 35 ++++++++-- .../microservice-configuration.interface.ts | 70 +++++++++++++++++-- packages/microservices/server/server-rmq.ts | 60 +++++++++------- 5 files changed, 134 insertions(+), 39 deletions(-) diff --git a/integration/microservices/e2e/topic-exchange-rmq.spec.ts b/integration/microservices/e2e/topic-exchange-rmq.spec.ts index 61234475f9e..2a2bd1ff68e 100644 --- a/integration/microservices/e2e/topic-exchange-rmq.spec.ts +++ b/integration/microservices/e2e/topic-exchange-rmq.spec.ts @@ -4,7 +4,7 @@ import { Test } from '@nestjs/testing'; import * as request from 'supertest'; import { RMQTopicExchangeController } from '../src/rmq/topic-exchange-rmq.controller'; -describe('RabbitMQ transport (Topic Exchange)', () => { +describe('RabbitMQ transport (Topic Exchange - wildcards)', () => { let server: any; let app: INestApplication; @@ -21,7 +21,7 @@ describe('RabbitMQ transport (Topic Exchange)', () => { options: { urls: [`amqp://0.0.0.0:5672`], queue: 'test2', - topicExchange: 'test', + wildcards: true, }, }); await app.startAllMicroservices(); @@ -30,7 +30,7 @@ describe('RabbitMQ transport (Topic Exchange)', () => { it(`should send message to wildcard topic exchange`, () => { return request(server).get('/topic-exchange').expect(200, 'wildcard.a.b'); - }).timeout(10000); + }); afterEach(async () => { await app.close(); diff --git a/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts b/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts index 0de76b27111..f482ab9a05f 100644 --- a/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts +++ b/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts @@ -19,7 +19,7 @@ export class RMQTopicExchangeController { options: { urls: [`amqp://localhost:5672`], queue: 'test2', - topicExchange: 'test', + wildcards: true, }, }); } diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index 29f3dbb88b0..2985c169e9d 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -216,6 +216,22 @@ export class ClientRMQ extends ClientProxy { ); } + if (this.options.wildcards) { + const exchange = this.getOptionsProp( + this.options, + 'exchange', + this.options.queue, + ); + const exchangeType = this.getOptionsProp( + this.options, + 'exchangeType', + 'topic', + ); + await channel.assertExchange(exchange, exchangeType, { + durable: true, + }); + } + await channel.prefetch(prefetchCount, isGlobalPrefetchCount); await this.consumeChannel(channel); resolve(); @@ -374,12 +390,21 @@ export class ClientRMQ extends ClientProxy { correlationId, }; - if (this.options.topicExchange) { + if (this.options.wildcards) { const stringifiedPattern = isString(message.pattern) ? message.pattern : JSON.stringify(message.pattern); + + // The exchange is the same as the queue when wildcards are enabled + // and the exchange is not explicitly set + const exchange = this.getOptionsProp( + this.options, + 'exchange', + this.queue, + ); + this.channel!.publish( - this.options.topicExchange, + exchange, stringifiedPattern, content, sendOptions, @@ -417,9 +442,11 @@ export class ClientRMQ extends ClientProxy { const errorCallback = (err: unknown) => err ? reject(err as Error) : resolve(); - return this.options.topicExchange + return this.options.wildcards ? this.channel!.publish( - this.options.topicExchange, + // The exchange is the same as the queue when wildcards are enabled + // and the exchange is not explicitly set + this.getOptionsProp(this.options, 'exchange', this.queue), isString(packet.pattern) ? packet.pattern : JSON.stringify(packet.pattern), diff --git a/packages/microservices/interfaces/microservice-configuration.interface.ts b/packages/microservices/interfaces/microservice-configuration.interface.ts index 079dcb4ef66..c23a5839106 100644 --- a/packages/microservices/interfaces/microservice-configuration.interface.ts +++ b/packages/microservices/interfaces/microservice-configuration.interface.ts @@ -215,29 +215,89 @@ export interface NatsOptions { export interface RmqOptions { transport?: Transport.RMQ; options?: { + /** + * An array of connection URLs to try in order. + */ urls?: string[] | RmqUrl[]; + /** + * The name of the queue. + */ queue?: string; + /** + * A prefetch count for this channel. The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement; + * once there are count messages outstanding, the server will not send more messages on this channel until one or more have been acknowledged. + */ prefetchCount?: number; + /** + * Sets the per-channel behavior for prefetching messages. + */ isGlobalPrefetchCount?: boolean; + /** + * Amqplib queue options. + * @see https://amqp-node.github.io/amqplib/channel_api.html#channel_assertQueue + */ queueOptions?: AmqplibQueueOptions; + /** + * AMQP Connection Manager socket options. + */ socketOptions?: AmqpConnectionManagerSocketOptions; - exchange?: string; - routingKey?: string; + /** + * Iif true, the broker won’t expect an acknowledgement of messages delivered to this consumer; i.e., it will dequeue messages as soon as they’ve been sent down the wire. + * @default false + */ noAck?: boolean; + /** + * A name which the server will use to distinguish message deliveries for the consumer; mustn’t be already in use on the channel. It’s usually easier to omit this, in which case the server will create a random name and supply it in the reply. + */ consumerTag?: string; + /** + * A serializer for the message payload. + */ serializer?: Serializer; + /** + * A deserializer for the message payload. + */ deserializer?: Deserializer; + /** + * A reply queue for the producer. + * @default 'amq.rabbitmq.reply-to' + */ replyQueue?: string; + /** + * If truthy, the message will survive broker restarts provided it’s in a queue that also survives restarts. + */ persistent?: boolean; + /** + * Additional headers to be sent with every message. + * Applies only to the producer configuration. + */ headers?: Record; + /** + * When false, a queue will not be asserted before consuming. + * @default false + */ noAssert?: boolean; /** - * Set only if you want to use Topic Exchange for routing messages to queues. + * Name for the exchange. Defaults to the queue name when "wildcards" is set to true. + * @default '' + */ + exchange?: string; + /** + * Type of the exchange + * @default 'topic' + */ + exchangeType?: 'direct' | 'fanout' | 'topic' | 'headers'; + /** + * Additional routing key for the topic exchange. + */ + routingKey?: string; + /** + * Set to true only if you want to use Topic Exchange for routing messages to queues. * Enabling this will allow you to use wildcards (*, #) as message and event patterns. - * Topic exchange can have any arbitrary name, but it should be the same for the producer (client) and consumer (server). * @see https://www.rabbitmq.com/tutorials/tutorial-five-python#topic-exchange + * @default false */ - topicExchange?: string; + wildcards?: boolean; /** * Maximum number of connection attempts. * Applies only to the consumer configuration. diff --git a/packages/microservices/server/server-rmq.ts b/packages/microservices/server/server-rmq.ts index b4044f07a50..08d3fe081d2 100644 --- a/packages/microservices/server/server-rmq.ts +++ b/packages/microservices/server/server-rmq.ts @@ -39,10 +39,12 @@ import { Server } from './server'; // import('amqp-connection-manager').AmqpConnectionManager; // type ChannelWrapper = import('amqp-connection-manager').ChannelWrapper; // type Message = import('amqplib').Message; +// type Channel = import('amqplib').Channel | import('amqplib').ConfirmChannel; type AmqpConnectionManager = any; type ChannelWrapper = any; type Message = any; +type Channel = any; let rmqPackage = {} as any; // as typeof import('amqp-connection-manager'); @@ -115,7 +117,7 @@ export class ServerRMQ extends Server { this._status$.next(RmqStatus.CONNECTED); this.channel = this.server!.createChannel({ json: false, - setup: (channel: any) => this.setupChannel(channel, callback!), + setup: (channel: Channel) => this.setupChannel(channel, callback!), }); }); @@ -177,7 +179,7 @@ export class ServerRMQ extends Server { }); } - public async setupChannel(channel: any, callback: Function) { + public async setupChannel(channel: Channel, callback: Function) { const noAssert = this.getOptionsProp(this.options, 'noAssert') ?? this.queueOptions.noAssert ?? @@ -198,36 +200,44 @@ export class ServerRMQ extends Server { RQM_DEFAULT_PREFETCH_COUNT, ); - if (this.options.exchange && this.options.routingKey) { - await channel.assertExchange(this.options.exchange, 'topic', { + if (this.options.exchange || this.options.wildcards) { + // Use queue name as exchange name if exchange is not provided and "wildcards" is set to true + const exchange = this.getOptionsProp( + this.options, + 'exchange', + this.options.queue, + ); + const exchangeType = this.getOptionsProp( + this.options, + 'exchangeType', + 'topic', + ); + await channel.assertExchange(exchange, exchangeType, { durable: true, }); - await channel.bindQueue( - this.queue, - this.options.exchange, - this.options.routingKey, - ); - } - // When "Topic exchange" is used, we need to bind the queue to the exchange - // with all the routing keys used by the handlers - if (this.options.topicExchange) { - const routingKeys = Array.from(this.getHandlers().keys()); - await Promise.all( - routingKeys.map(routingKey => - channel.bindQueue(this.queue, this.options.topicExchange, routingKey), - ), - ); + if (this.options.routingKey) { + await channel.bindQueue(this.queue, exchange, this.options.routingKey); + } - // "Topic exchange" supports wildcards, so we need to initialize wildcard handlers - // otherwise we would not be able to associate the incoming messages with the handlers - this.initializeWildcardHandlersIfExist(); + if (this.options.wildcards) { + const routingKeys = Array.from(this.getHandlers().keys()); + await Promise.all( + routingKeys.map(routingKey => + channel.bindQueue(this.queue, exchange, routingKey), + ), + ); + + // When "wildcards" is set to true, we need to initialize wildcard handlers + // otherwise we would not be able to associate the incoming messages with the handlers + this.initializeWildcardHandlersIfExist(); + } } await channel.prefetch(prefetchCount, isGlobalPrefetchCount); channel.consume( this.queue, - (msg: Record) => this.handleMessage(msg, channel), + (msg: Record | null) => this.handleMessage(msg!, channel), { noAck: this.noAck, consumerTag: this.getOptionsProp( @@ -337,9 +347,7 @@ export class ServerRMQ extends Server { } public getHandlerByPattern(pattern: string): MessageHandler | null { - if (!this.options.topicExchange) { - // When "Topic exchange" is not used, wildcards are not supported - // so we can fallback to the default behavior + if (!this.options.wildcards) { return super.getHandlerByPattern(pattern); }