diff --git a/src/app.ts b/src/app.ts index ccca7a83..4b0a8bba 100644 --- a/src/app.ts +++ b/src/app.ts @@ -165,18 +165,24 @@ if (process.env.USE_MQ === 'true') { logging.info('Message queue is enabled'); try { + const host = String(process.env.MQ_PUBSUB_HOST) || undefined; + const emulatorMode = process.env.NODE_ENV !== 'production'; + const projectId = String(process.env.MQ_PUBSUB_PROJECT_ID) || undefined; + const topicName = String(process.env.MQ_PUBSUB_TOPIC_NAME); + const subscriptionName = String( + process.env.MQ_PUBSUB_SUBSCRIPTION_NAME, + ); + queue = await initGCloudPubSubMessageQueue( logging, eventBus, EVENT_MQ_MESSAGE_RECEIVED, { - host: String(process.env.MQ_PUBSUB_HOST), - emulatorMode: process.env.NODE_ENV !== 'production', - projectId: String(process.env.MQ_PUBSUB_PROJECT_ID), - topicName: String(process.env.MQ_PUBSUB_TOPIC_NAME), - subscriptionName: String( - process.env.MQ_PUBSUB_SUBSCRIPTION_NAME, - ), + host, + emulatorMode, + projectId, + topicName, + subscriptionName, }, ); } catch (err) { diff --git a/src/helpers/gcloud-pubsub-mq.ts b/src/helpers/gcloud-pubsub-mq.ts index 319b69ef..1b649279 100644 --- a/src/helpers/gcloud-pubsub-mq.ts +++ b/src/helpers/gcloud-pubsub-mq.ts @@ -1,5 +1,5 @@ import type { EventEmitter } from 'node:events'; -import { PubSub } from '@google-cloud/pubsub'; +import { type ClientConfig, PubSub } from '@google-cloud/pubsub'; import type { Logger } from '@logtape/logtape'; import { GCloudPubSubMessageQueue } from '../mq/gcloud-pubsub-mq'; @@ -36,9 +36,9 @@ async function subscriptionExists( } type InitGCloudPubSubMessageQueueOptions = { - host: string; - emulatorMode: boolean; - projectId: string; + host?: string; + emulatorMode?: boolean; + projectId?: string; topicName: string; subscriptionName: string; }; @@ -55,15 +55,28 @@ export async function initGCloudPubSubMessageQueue( subscriptionName = 'unknown_subscription', }: InitGCloudPubSubMessageQueueOptions, ) { - const pubSubClient = new PubSub({ - projectId, - apiEndpoint: host, - emulatorMode, - }); + const pubsubClientConfig: Partial = {}; + + if (host !== undefined) { + pubsubClientConfig.apiEndpoint = host; + } + + if (emulatorMode !== undefined) { + pubsubClientConfig.emulatorMode = emulatorMode; + } - const topicIdentifier = getFullTopicIdentifier(projectId, topicName); + if (projectId !== undefined) { + pubsubClientConfig.projectId = projectId; + } + + const pubSubClient = new PubSub(pubsubClientConfig); + + const topicIdentifier = getFullTopicIdentifier( + pubSubClient.projectId, + topicName, + ); const subscriptionIdentifier = getFullSubscriptionIdentifier( - projectId, + pubSubClient.projectId, subscriptionName, ); diff --git a/src/helpers/gcloud-pubsub-mq.unit.test.ts b/src/helpers/gcloud-pubsub-mq.unit.test.ts index 02669bbf..11ba3176 100644 --- a/src/helpers/gcloud-pubsub-mq.unit.test.ts +++ b/src/helpers/gcloud-pubsub-mq.unit.test.ts @@ -69,6 +69,7 @@ describe('initGCloudPubSubMessageQueue', () => { return { getTopics: vi.fn().mockResolvedValue([topics]), getSubscriptions: vi.fn().mockResolvedValue([subscriptions]), + projectId: options.projectId, }; }); @@ -100,6 +101,55 @@ describe('initGCloudPubSubMessageQueue', () => { ); }); + it('should only explicitly provide options to the PubSub client if they are defined', async () => { + const projectId = 'foo'; + const options = { + topicName: 'baz', + subscriptionName: 'qux', + }; + const topics: Topic[] = [ + { + name: getFullTopicIdentifier(projectId, options.topicName), + } as Topic, + ]; + const subscriptions: Subscription[] = [ + { + name: getFullSubscriptionIdentifier( + projectId, + options.subscriptionName, + ), + } as Subscription, + ]; + + (vi.mocked(PubSub) as Mock).mockImplementation(() => { + return { + getTopics: vi.fn().mockResolvedValue([topics]), + getSubscriptions: vi.fn().mockResolvedValue([subscriptions]), + projectId, + }; + }); + + await expect( + initGCloudPubSubMessageQueue( + mockLogger, + mockEventBus, + EVENT_NAME, + options, + ), + ).resolves.toBeInstanceOf(GCloudPubSubMessageQueue); + + expect(PubSub).toHaveBeenCalledWith({}); + + expect(GCloudPubSubMessageQueue).toHaveBeenCalledWith( + expect.any(Object), + mockEventBus, + mockLogger, + getFullTopicIdentifier(projectId, options.topicName), + getFullSubscriptionIdentifier(projectId, options.subscriptionName), + EVENT_NAME, + ); + }); + it('should throw an error if the topic does not exist', async () => { const options = { host: 'foo', @@ -113,6 +163,7 @@ describe('initGCloudPubSubMessageQueue', () => { (vi.mocked(PubSub) as Mock).mockImplementation(() => { return { getTopics: vi.fn().mockResolvedValue([topics]), + projectId: options.projectId, }; }); @@ -148,6 +199,7 @@ describe('initGCloudPubSubMessageQueue', () => { return { getTopics: vi.fn().mockResolvedValue([topics]), getSubscriptions: vi.fn().mockResolvedValue([subscriptions]), + projectId: options.projectId, }; });