Skip to content

Commit

Permalink
Made pubsub client options optional
Browse files Browse the repository at this point in the history
  • Loading branch information
mike182uk committed Nov 14, 2024
1 parent 0e085fe commit 283693f
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 18 deletions.
20 changes: 13 additions & 7 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,18 +165,24 @@ if (process.env.USE_MQ === 'true') {
logging.info('Message queue is enabled');

try {
const host = String(process.env.MQ_PUBSUB_HOST) || undefined;
const emulatorMode = process.env.NODE_ENV !== 'production';
const projectId = String(process.env.MQ_PUBSUB_PROJECT_ID) || undefined;
const topicName = String(process.env.MQ_PUBSUB_TOPIC_NAME);
const subscriptionName = String(
process.env.MQ_PUBSUB_SUBSCRIPTION_NAME,
);

queue = await initGCloudPubSubMessageQueue(
logging,
eventBus,
EVENT_MQ_MESSAGE_RECEIVED,
{
host: String(process.env.MQ_PUBSUB_HOST),
emulatorMode: process.env.NODE_ENV !== 'production',
projectId: String(process.env.MQ_PUBSUB_PROJECT_ID),
topicName: String(process.env.MQ_PUBSUB_TOPIC_NAME),
subscriptionName: String(
process.env.MQ_PUBSUB_SUBSCRIPTION_NAME,
),
host,
emulatorMode,
projectId,
topicName,
subscriptionName,
},
);
} catch (err) {
Expand Down
35 changes: 24 additions & 11 deletions src/helpers/gcloud-pubsub-mq.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { EventEmitter } from 'node:events';
import { PubSub } from '@google-cloud/pubsub';
import { type ClientConfig, PubSub } from '@google-cloud/pubsub';
import type { Logger } from '@logtape/logtape';

import { GCloudPubSubMessageQueue } from '../mq/gcloud-pubsub-mq';
Expand Down Expand Up @@ -36,9 +36,9 @@ async function subscriptionExists(
}

type InitGCloudPubSubMessageQueueOptions = {
host: string;
emulatorMode: boolean;
projectId: string;
host?: string;
emulatorMode?: boolean;
projectId?: string;
topicName: string;
subscriptionName: string;
};
Expand All @@ -55,15 +55,28 @@ export async function initGCloudPubSubMessageQueue(
subscriptionName = 'unknown_subscription',
}: InitGCloudPubSubMessageQueueOptions,
) {
const pubSubClient = new PubSub({
projectId,
apiEndpoint: host,
emulatorMode,
});
const pubsubClientConfig: Partial<ClientConfig> = {};

if (host !== undefined) {
pubsubClientConfig.apiEndpoint = host;
}

if (emulatorMode !== undefined) {
pubsubClientConfig.emulatorMode = emulatorMode;
}

const topicIdentifier = getFullTopicIdentifier(projectId, topicName);
if (projectId !== undefined) {
pubsubClientConfig.projectId = projectId;
}

const pubSubClient = new PubSub(pubsubClientConfig);

const topicIdentifier = getFullTopicIdentifier(
pubSubClient.projectId,
topicName,
);
const subscriptionIdentifier = getFullSubscriptionIdentifier(
projectId,
pubSubClient.projectId,
subscriptionName,
);

Expand Down
52 changes: 52 additions & 0 deletions src/helpers/gcloud-pubsub-mq.unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ describe('initGCloudPubSubMessageQueue', () => {
return {
getTopics: vi.fn().mockResolvedValue([topics]),
getSubscriptions: vi.fn().mockResolvedValue([subscriptions]),
projectId: options.projectId,
};
});

Expand Down Expand Up @@ -100,6 +101,55 @@ describe('initGCloudPubSubMessageQueue', () => {
);
});

it('should only explicitly provide options to the PubSub client if they are defined', async () => {
const projectId = 'foo';
const options = {
topicName: 'baz',
subscriptionName: 'qux',
};
const topics: Topic[] = [
{
name: getFullTopicIdentifier(projectId, options.topicName),
} as Topic,
];
const subscriptions: Subscription[] = [
{
name: getFullSubscriptionIdentifier(
projectId,
options.subscriptionName,
),
} as Subscription,
];

(vi.mocked(PubSub) as Mock).mockImplementation(() => {
return {
getTopics: vi.fn().mockResolvedValue([topics]),
getSubscriptions: vi.fn().mockResolvedValue([subscriptions]),
projectId,
};
});

await expect(
initGCloudPubSubMessageQueue(
mockLogger,
mockEventBus,
EVENT_NAME,
options,
),
).resolves.toBeInstanceOf(GCloudPubSubMessageQueue);

expect(PubSub).toHaveBeenCalledWith({});

expect(GCloudPubSubMessageQueue).toHaveBeenCalledWith(
expect.any(Object),
mockEventBus,
mockLogger,
getFullTopicIdentifier(projectId, options.topicName),
getFullSubscriptionIdentifier(projectId, options.subscriptionName),
EVENT_NAME,
);
});

it('should throw an error if the topic does not exist', async () => {
const options = {
host: 'foo',
Expand All @@ -113,6 +163,7 @@ describe('initGCloudPubSubMessageQueue', () => {
(vi.mocked(PubSub) as Mock).mockImplementation(() => {
return {
getTopics: vi.fn().mockResolvedValue([topics]),
projectId: options.projectId,
};
});

Expand Down Expand Up @@ -148,6 +199,7 @@ describe('initGCloudPubSubMessageQueue', () => {
return {
getTopics: vi.fn().mockResolvedValue([topics]),
getSubscriptions: vi.fn().mockResolvedValue([subscriptions]),
projectId: options.projectId,
};
});

Expand Down

0 comments on commit 283693f

Please sign in to comment.