diff --git a/docker-compose.yml b/docker-compose.yml index d9ff7c4c..f653a604 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,11 @@ services: - NODE_ENV=testing - ALLOW_PRIVATE_ADDRESS=true - SKIP_SIGNATURE_VERIFICATION=true + - USE_MQ=true + - MQ_PUBSUB_PROJECT_ID=activitypub + - MQ_PUBSUB_HOST=pubsub:8085 + - MQ_PUBSUB_TOPIC_NAME=activitypub_topic + - MQ_PUBSUB_SUBSCRIPTION_NAME=activitypub_subscription command: yarn build:watch depends_on: migrate: @@ -74,7 +79,13 @@ services: image: gcr.io/google.com/cloudsdktool/google-cloud-cli:499.0.0-emulators ports: - "8085:8085" - command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=activitypub + command: /bin/bash -c "/opt/activitypub/start-pubsub.sh" + volumes: + - ./pubsub/start.sh:/opt/activitypub/start-pubsub.sh + environment: + - PROJECT_ID=activitypub + - TOPIC_NAME=activitypub_topic + - SUBSCRIPTION_NAME=activitypub_subscription healthcheck: test: "curl -f http://localhost:8085 || exit 1" interval: 1s @@ -98,6 +109,11 @@ services: - SKIP_SIGNATURE_VERIFICATION=true - ALLOW_PRIVATE_ADDRESS=true - NODE_TLS_REJECT_UNAUTHORIZED=0 + - USE_MQ=true + - MQ_PUBSUB_PROJECT_ID=activitypub + - MQ_PUBSUB_HOST=pubsub-testing:8085 + - MQ_PUBSUB_TOPIC_NAME=activitypub_topic + - MQ_PUBSUB_SUBSCRIPTION_NAME=activitypub_subscription command: yarn build:watch depends_on: mysql-testing: @@ -157,7 +173,13 @@ services: image: gcr.io/google.com/cloudsdktool/google-cloud-cli:499.0.0-emulators ports: - "8086:8085" - command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=activitypub + command: /bin/bash -c "/opt/activitypub/start-pubsub.sh" + volumes: + - ./pubsub/start.sh:/opt/activitypub/start-pubsub.sh + environment: + - PROJECT_ID=activitypub + - TOPIC_NAME=activitypub_topic + - SUBSCRIPTION_NAME=activitypub_subscription healthcheck: test: "curl -f http://localhost:8085 || exit 1" interval: 1s diff --git a/pubsub/start.sh b/pubsub/start.sh new file mode 100755 index 00000000..43ff272e --- /dev/null +++ b/pubsub/start.sh @@ -0,0 +1,46 @@ +#!/bin/bash + +# This script is used to start the Pub/Sub emulator and create the required +# topic and subscription (defined in the environment variables) upfront +# +# See: +# https://cloud.google.com/pubsub/docs/emulator +# https://cloud.google.com/pubsub/docs/create-topic#pubsub_create_topic-rest +# https://cloud.google.com/pubsub/docs/create-push-subscription#pubsub_create_push_subscription-rest + +# Ensure we explicitly set the host to 0.0.0.0:8085 so that the emulator will +# listen on all ip addresses and not just IPv6 which is the default +HOST=0.0.0.0:8085 + +# Start the emulator +gcloud beta emulators pubsub start --host-port=${HOST} --project=${PROJECT_ID} & + +# Wait for the emulator to be ready +until curl -f http://${HOST}; do + echo "Waiting for Pub/Sub emulator to start..." + + sleep 1 +done + +# Create the topic via REST API +if curl -s -o /dev/null -w "%{http_code}" -X PUT http://${HOST}/v1/projects/${PROJECT_ID}/topics/${TOPIC_NAME} | grep -q "200"; then + echo "Topic created: ${TOPIC_NAME}" +else + echo "Failed to create topic: ${TOPIC_NAME}" + exit 1 +fi + +# Create the subscription via REST API +if curl -s -o /dev/null -w "%{http_code}" -X PUT http://${HOST}/v1/projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_NAME} \ + -H "Content-Type: application/json" \ + -d '{ + "topic": "projects/'${PROJECT_ID}'/topics/'${TOPIC_NAME}'" +}' | grep -q "200"; then + echo "Subscription created: ${SUBSCRIPTION_NAME}" +else + echo "Failed to create subscription: ${SUBSCRIPTION_NAME}" + exit 1 +fi + +# Keep the container running +tail -f /dev/null diff --git a/src/app.ts b/src/app.ts index 4c8761c4..3bb756b8 100644 --- a/src/app.ts +++ b/src/app.ts @@ -11,6 +11,7 @@ import { Follow, type KvStore, Like, + type MessageQueue, Note, Undo, Update, @@ -75,6 +76,7 @@ import { undoDispatcher, updateDispatcher, } from './dispatchers'; +import { GCloudPubSubMessageQueue } from './fedify/mq/gcloud-pubsub-mq'; import { KnexKvStore } from './knex.kvstore'; import { scopeKvStore } from './kv-helpers'; @@ -88,6 +90,13 @@ import { unlikeAction, } from './handlers'; +import { PubSub } from '@google-cloud/pubsub'; +import { + getFullSubscriptionIdentifier, + getFullTopicIdentifier, + subscriptionExists, + topicExists, +} from 'helpers/gcloud-pubsub'; import { getTraceAndSpanId } from './helpers/context-header'; import { getRequestData } from './helpers/request-data'; import { spanWrapper } from './instrumentation'; @@ -153,8 +162,56 @@ export type ContextData = { const fedifyKv = await KnexKvStore.create(client, 'key_value'); +let messageQueue: MessageQueue | undefined; + +if (process.env.USE_MQ === 'true') { + logging.info('Message queue is enabled'); + + try { + const pubSubClient = new PubSub({ + projectId: process.env.MQ_PUBSUB_PROJECT_ID, + apiEndpoint: process.env.MQ_PUBSUB_HOST, + emulatorMode: process.env.NODE_ENV !== 'production', + }); + + const topicName = process.env.MQ_PUBSUB_TOPIC_NAME ?? 'unknown_topic'; + const subscriptionName = + process.env.MQ_PUBSUB_SUBSCRIPTION_NAME ?? 'unknown_subscription'; + + const topicIdentifier = getFullTopicIdentifier(pubSubClient, topicName); + const subscriptionIdentifier = getFullSubscriptionIdentifier( + pubSubClient, + subscriptionName, + ); + + if (!(await topicExists(pubSubClient, topicIdentifier))) { + throw new Error(`Topic does not exist: ${topicName}`); + } + + if (!(await subscriptionExists(pubSubClient, subscriptionIdentifier))) { + throw new Error(`Subscription does not exist: ${subscriptionName}`); + } + + messageQueue = new GCloudPubSubMessageQueue( + pubSubClient, + topicIdentifier, + subscriptionIdentifier, + logging, + ); + } catch (err) { + logging.error('Failed to initialise message queue {error}', { + error: err, + }); + + process.exit(1); + } +} else { + logging.info('Message queue is disabled'); +} + export const fedify = createFederation({ kv: fedifyKv, + queue: messageQueue, skipSignatureVerification: process.env.SKIP_SIGNATURE_VERIFICATION === 'true' && process.env.NODE_ENV === 'testing', diff --git a/src/fedify/mq/gcloud-pubsub-mq.ts b/src/fedify/mq/gcloud-pubsub-mq.ts new file mode 100644 index 00000000..b5aaa33e --- /dev/null +++ b/src/fedify/mq/gcloud-pubsub-mq.ts @@ -0,0 +1,105 @@ +import type { Message, PubSub } from '@google-cloud/pubsub'; + +import type { + MessageQueue, + MessageQueueEnqueueOptions, + MessageQueueListenOptions, +} from '@fedify/fedify'; +import type { Logger } from '@logtape/logtape'; + +export class GCloudPubSubMessageQueue implements MessageQueue { + private pubSubClient: PubSub; + private topicIdentifier: string; + private subscriptionIdentifier: string; + private logger: Logger; + + constructor( + pubSubClient: PubSub, + topicIdentifier: string, + subscriptionIdentifier: string, + logger: Logger, + ) { + this.pubSubClient = pubSubClient; + this.topicIdentifier = topicIdentifier; + this.subscriptionIdentifier = subscriptionIdentifier; + this.logger = logger; + } + + async enqueue( + message: any, + options?: MessageQueueEnqueueOptions, + ): Promise { + const delay = options?.delay?.total('millisecond') ?? 0; + + this.logger.info( + `Enqueuing message [FedifyID: ${message.id}] with delay: ${delay}ms`, + ); + + if (delay > 0) { + await new Promise((resolve) => setTimeout(resolve, delay)); + } + + try { + const messageId = await this.pubSubClient + .topic(this.topicIdentifier) + .publishMessage({ + json: message, + attributes: { + fedifyId: message.id, + }, + }); + + this.logger.info( + `Message [FedifyID: ${message.id}] was enqueued [PubSubID: ${messageId}]`, + ); + } catch (error) { + this.logger.error( + `Failed to enqueue message [FedifyID: ${message.id}]: ${error}`, + ); + } + } + + async listen( + handler: (message: any) => Promise | void, + options: MessageQueueListenOptions = {}, + ): Promise { + const subscription = this.pubSubClient.subscription( + this.subscriptionIdentifier, + ); + + subscription.on('message', async (message: Message) => { + const fedifyId = message.attributes.fedifyId ?? 'unknown'; + + this.logger.info( + `Handling message [FedifyID: ${fedifyId}, PubSubID: ${message.id}]`, + ); + + try { + const json = JSON.parse(message.data.toString()); + + await handler(json); + + message.ack(); + + this.logger.info( + `Acknowledged message [FedifyID: ${fedifyId}, PubSubID: ${message.id}]`, + ); + } catch (error) { + message.nack(); + + this.logger.error( + `Failed to handle message [FedifyID: ${fedifyId}, PubSubID: ${message.id}]: ${error}`, + ); + } + }); + + return await new Promise((resolve) => { + options.signal?.addEventListener('abort', () => { + subscription + .removeAllListeners() + .close() + .then(() => resolve()); + }); + }); + } +} diff --git a/src/fedify/mq/gcloud-pubsub-mq.unit.test.ts b/src/fedify/mq/gcloud-pubsub-mq.unit.test.ts new file mode 100644 index 00000000..a70bf8cc --- /dev/null +++ b/src/fedify/mq/gcloud-pubsub-mq.unit.test.ts @@ -0,0 +1,229 @@ +import type { + Message, + PubSub, + Subscription, + Topic, +} from '@google-cloud/pubsub'; +import { Temporal } from '@js-temporal/polyfill'; +import type { Logger } from '@logtape/logtape'; +import { beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import { GCloudPubSubMessageQueue } from './gcloud-pubsub-mq'; + +const TOPIC_IDENTIFIER = 'topic'; +const SUBSCRIPTION_IDENTIFIER = 'subscription'; + +describe('enqueue', () => { + const MESSAGE = { id: 'abc123' }; + const MOCK_MESSAGE_ID = 'def789'; + + let mockLogger: Logger; + let mockTopic: Topic; + let mockPubSubClient: PubSub; + let messageQueue: GCloudPubSubMessageQueue; + + beforeAll(() => { + vi.useFakeTimers(); + }); + + beforeEach(() => { + mockLogger = { + info: vi.fn(), + error: vi.fn(), + } as unknown as Logger; + + mockTopic = { + publishMessage: vi.fn().mockResolvedValue(MOCK_MESSAGE_ID), + } as unknown as Topic; + + mockPubSubClient = { + topic: vi.fn().mockImplementation((identifier) => { + if (identifier === TOPIC_IDENTIFIER) { + return mockTopic; + } + throw new Error('Unexpected topic identifier'); + }), + } as unknown as PubSub; + + messageQueue = new GCloudPubSubMessageQueue( + mockPubSubClient, + TOPIC_IDENTIFIER, + SUBSCRIPTION_IDENTIFIER, + mockLogger, + ); + }); + + it('should publish a message without a delay', async () => { + const enqueuePromise = messageQueue.enqueue(MESSAGE); + + vi.runAllTimers(); + await enqueuePromise; + + expect(mockLogger.info).toHaveBeenCalledWith( + `Enqueuing message [FedifyID: ${MESSAGE.id}] with delay: 0ms`, + ); + expect(mockPubSubClient.topic).toHaveBeenCalledWith(TOPIC_IDENTIFIER); + expect(mockTopic.publishMessage).toHaveBeenCalledWith({ + json: MESSAGE, + attributes: { fedifyId: MESSAGE.id }, + }); + expect(mockLogger.info).toHaveBeenCalledWith( + `Message [FedifyID: ${MESSAGE.id}] was enqueued [PubSubID: ${MOCK_MESSAGE_ID}]`, + ); + }); + + it('should publish a message with a delay', async () => { + const delayMs = 1000; + const enqueuePromise = messageQueue.enqueue(MESSAGE, { + delay: Temporal.Duration.from({ milliseconds: delayMs }), + }); + + vi.advanceTimersByTime(delayMs); + await enqueuePromise; + + expect(mockLogger.info).toHaveBeenCalledWith( + `Enqueuing message [FedifyID: ${MESSAGE.id}] with delay: ${delayMs}ms`, + ); + expect(mockPubSubClient.topic).toHaveBeenCalledWith(TOPIC_IDENTIFIER); + expect(mockTopic.publishMessage).toHaveBeenCalledWith({ + json: MESSAGE, + attributes: { fedifyId: MESSAGE.id }, + }); + expect(mockLogger.info).toHaveBeenCalledWith( + `Message [FedifyID: ${MESSAGE.id}] was enqueued [PubSubID: ${MOCK_MESSAGE_ID}]`, + ); + }); + + it('should log an error if the message fails to be published', async () => { + mockTopic.publishMessage = vi + .fn() + .mockRejectedValue(new Error('Failed to publish message')); + + const enqueuePromise = messageQueue.enqueue(MESSAGE); + + vi.runAllTimers(); + await enqueuePromise; + + expect(mockLogger.error).toHaveBeenCalledWith( + `Failed to enqueue message [FedifyID: ${MESSAGE.id}]: Error: Failed to publish message`, + ); + expect(mockPubSubClient.topic).toHaveBeenCalledWith(TOPIC_IDENTIFIER); + expect(mockTopic.publishMessage).toHaveBeenCalledWith({ + json: MESSAGE, + attributes: { fedifyId: MESSAGE.id }, + }); + }); +}); + +describe('listen', () => { + const MESSAGE_DATA = { foo: 'bar' }; + const MESSAGE = { + id: 'abc123', + attributes: { + fedifyId: 'def789', + }, + data: Buffer.from(JSON.stringify(MESSAGE_DATA)), + ack: vi.fn(), + nack: vi.fn(), + } as unknown as Message; + + let mockLogger: Logger; + let mockSubscription: Subscription; + let mockPubSubClient: PubSub; + let messageQueue: GCloudPubSubMessageQueue; + + beforeEach(() => { + mockLogger = { + info: vi.fn(), + error: vi.fn(), + } as unknown as Logger; + + mockSubscription = { + on: vi + .fn() + .mockImplementation( + (event: string, callback: (message: Message) => void) => { + if (event === 'message') { + callback(MESSAGE); + } + }, + ), + removeAllListeners: vi.fn().mockReturnThis(), + close: vi.fn().mockResolvedValue(undefined), + } as unknown as Subscription; + + mockPubSubClient = { + subscription: vi.fn().mockImplementation((identifier) => { + if (identifier === SUBSCRIPTION_IDENTIFIER) { + return mockSubscription; + } + throw new Error('Unexpected subscription identifier'); + }), + } as unknown as PubSub; + + messageQueue = new GCloudPubSubMessageQueue( + mockPubSubClient, + TOPIC_IDENTIFIER, + SUBSCRIPTION_IDENTIFIER, + mockLogger, + ); + }); + + it('should handle and acknowledge a message', async () => { + const abortController = new AbortController(); + const handler = vi.fn().mockResolvedValue(undefined); + + const listenPromise = messageQueue.listen(handler, { + signal: abortController.signal, + }); + + abortController.abort(); + await listenPromise; + + expect(handler).toHaveBeenCalledWith(MESSAGE_DATA); + expect(MESSAGE.ack).toHaveBeenCalled(); + expect(mockLogger.info).toHaveBeenCalledWith( + `Handling message [FedifyID: ${MESSAGE.attributes.fedifyId}, PubSubID: ${MESSAGE.id}]`, + ); + expect(mockLogger.info).toHaveBeenCalledWith( + `Acknowledged message [FedifyID: ${MESSAGE.attributes.fedifyId}, PubSubID: ${MESSAGE.id}]`, + ); + }); + + it('should log an error and nack the message if message handling fails', async () => { + const abortController = new AbortController(); + const handler = vi + .fn() + .mockRejectedValue(new Error('Failed to handle message')); + + const listenPromise = messageQueue.listen(handler, { + signal: abortController.signal, + }); + + abortController.abort(); + await listenPromise; + + expect(handler).toHaveBeenCalledWith(MESSAGE_DATA); + expect(MESSAGE.nack).toHaveBeenCalled(); + expect(mockLogger.info).toHaveBeenCalledWith( + `Handling message [FedifyID: ${MESSAGE.attributes.fedifyId}, PubSubID: ${MESSAGE.id}]`, + ); + expect(mockLogger.error).toHaveBeenCalledWith( + `Failed to handle message [FedifyID: ${MESSAGE.attributes.fedifyId}, PubSubID: ${MESSAGE.id}]: Error: Failed to handle message`, + ); + }); + + it('should clean up the subscription when the abort signal is triggered', async () => { + const abortController = new AbortController(); + const handler = vi.fn(); + + const listenPromise = messageQueue.listen(handler, { + signal: abortController.signal, + }); + + abortController.abort(); + await listenPromise; + + expect(mockSubscription.removeAllListeners).toHaveBeenCalled(); + expect(mockSubscription.close).toHaveBeenCalled(); + }); +}); diff --git a/src/helpers/gcloud-pubsub.ts b/src/helpers/gcloud-pubsub.ts new file mode 100644 index 00000000..cc5f1ec0 --- /dev/null +++ b/src/helpers/gcloud-pubsub.ts @@ -0,0 +1,32 @@ +import type { PubSub } from '@google-cloud/pubsub'; + +export function getFullTopicIdentifier( + client: PubSub, + topicIdentifier: string, +) { + return `projects/${client.projectId}/topics/${topicIdentifier}`; +} + +export async function topicExists(client: PubSub, topicIdentifier: string) { + const [topics] = await client.getTopics(); + + return topics.some((topic) => topic.name === topicIdentifier); +} + +export function getFullSubscriptionIdentifier( + client: PubSub, + subscriptionIdentifier: string, +) { + return `projects/${client.projectId}/subscriptions/${subscriptionIdentifier}`; +} + +export async function subscriptionExists( + client: PubSub, + subscriptionIdentifier: string, +) { + const [subscriptions] = await client.getSubscriptions(); + + return subscriptions.some( + (subscription) => subscription.name === subscriptionIdentifier, + ); +} diff --git a/src/helpers/gcloud-pubsub.unit.test.ts b/src/helpers/gcloud-pubsub.unit.test.ts new file mode 100644 index 00000000..bc9dd00a --- /dev/null +++ b/src/helpers/gcloud-pubsub.unit.test.ts @@ -0,0 +1,79 @@ +import { describe, expect, it, vi } from 'vitest'; + +import type { PubSub } from '@google-cloud/pubsub'; + +import { + getFullSubscriptionIdentifier, + getFullTopicIdentifier, + subscriptionExists, + topicExists, +} from './gcloud-pubsub'; + +const PROJECT_ID = 'foo'; +const TOPIC_NAME = 'bar'; +const SUBSCRIPTION_NAME = 'baz'; + +function getMockClient(implementation: Partial = {}) { + return { + projectId: PROJECT_ID, + ...implementation, + } as unknown as PubSub; +} + +describe('getFullTopicIdentifier', () => { + it('should return the correct full topic identifier', () => { + const mockClient = getMockClient(); + + const result = getFullTopicIdentifier(mockClient, TOPIC_NAME); + + expect(result).toBe(`projects/${PROJECT_ID}/topics/${TOPIC_NAME}`); + }); +}); + +describe('getFullSubscriptionIdentifier', () => { + it('should return the correct full subscription identifier', () => { + const mockClient = getMockClient(); + + const result = getFullSubscriptionIdentifier( + mockClient, + SUBSCRIPTION_NAME, + ); + + expect(result).toBe( + `projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_NAME}`, + ); + }); +}); + +describe('topicExists', () => { + it('should check if a topic exists', async () => { + const topicIdentifier = `projects/${PROJECT_ID}/topics/${TOPIC_NAME}`; + + const mockClient = getMockClient({ + getTopics: vi.fn().mockResolvedValue([[{ name: topicIdentifier }]]), + }); + + const result = await topicExists(mockClient, topicIdentifier); + + expect(result).toBe(true); + }); +}); + +describe('subscriptionExists', () => { + it('should check if a subscription exists', async () => { + const subscriptionIdentifier = `projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_NAME}`; + + const mockClient = getMockClient({ + getSubscriptions: vi + .fn() + .mockResolvedValue([[{ name: subscriptionIdentifier }]]), + }); + + const result = await subscriptionExists( + mockClient, + subscriptionIdentifier, + ); + + expect(result).toBe(true); + }); +});