From 8ebc64c314fc42ca8f265ec1798be170d3e6a172 Mon Sep 17 00:00:00 2001 From: Michael Barrett Date: Thu, 7 Nov 2024 16:38:17 +0000 Subject: [PATCH] wip --- docker-compose.yml | 4 ++ src/app.ts | 14 +++++ src/fedify/mq/gcloud-pubsub-mq.ts | 83 ++++++++++++++++++++++++++ src/helpers/gcloud-pubsub.ts | 54 +++++++++++++++++ src/helpers/gcloud-pubsub.unit.test.ts | 13 ++++ 5 files changed, 168 insertions(+) create mode 100644 src/fedify/mq/gcloud-pubsub-mq.ts create mode 100644 src/helpers/gcloud-pubsub.ts create mode 100644 src/helpers/gcloud-pubsub.unit.test.ts diff --git a/docker-compose.yml b/docker-compose.yml index d9ff7c4c..58401aa0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,8 @@ services: - NODE_ENV=testing - ALLOW_PRIVATE_ADDRESS=true - SKIP_SIGNATURE_VERIFICATION=true + - GCP_PROJECT_ID=activitypub + - GCP_PUBSUB_HOST=pubsub:8085 command: yarn build:watch depends_on: migrate: @@ -98,6 +100,8 @@ services: - SKIP_SIGNATURE_VERIFICATION=true - ALLOW_PRIVATE_ADDRESS=true - NODE_TLS_REJECT_UNAUTHORIZED=0 + - GCP_PROJECT_ID=activitypub + - GCP_PUBSUB_HOST=pubsub-testing:8085 command: yarn build:watch depends_on: mysql-testing: diff --git a/src/app.ts b/src/app.ts index 4c8761c4..608c12d8 100644 --- a/src/app.ts +++ b/src/app.ts @@ -11,6 +11,7 @@ import { Follow, type KvStore, Like, + type MessageQueue, Note, Undo, Update, @@ -75,6 +76,7 @@ import { undoDispatcher, updateDispatcher, } from './dispatchers'; +import { GCloudPubSubMessageQueue } from './fedify/mq/gcloud-pubsub-mq'; import { KnexKvStore } from './knex.kvstore'; import { scopeKvStore } from './kv-helpers'; @@ -89,6 +91,7 @@ import { } from './handlers'; import { getTraceAndSpanId } from './helpers/context-header'; +import { initPubSub } from './helpers/gcloud-pubsub'; import { getRequestData } from './helpers/request-data'; import { spanWrapper } from './instrumentation'; @@ -153,8 +156,19 @@ export type ContextData = { const fedifyKv = await KnexKvStore.create(client, 'key_value'); +let messageQueue: MessageQueue | undefined; + +try { + const { topic, subscription } = await initPubSub(); + + messageQueue = new GCloudPubSubMessageQueue(topic, subscription, logging); +} catch (err) { + logging.error('Failed to initialise message queue {error}', { error: err }); +} + export const fedify = createFederation({ kv: fedifyKv, + queue: messageQueue, skipSignatureVerification: process.env.SKIP_SIGNATURE_VERIFICATION === 'true' && process.env.NODE_ENV === 'testing', diff --git a/src/fedify/mq/gcloud-pubsub-mq.ts b/src/fedify/mq/gcloud-pubsub-mq.ts new file mode 100644 index 00000000..1f6d40c4 --- /dev/null +++ b/src/fedify/mq/gcloud-pubsub-mq.ts @@ -0,0 +1,83 @@ +import type { Message, Subscription, Topic } from '@google-cloud/pubsub'; + +import type { + MessageQueue, + MessageQueueEnqueueOptions, + MessageQueueListenOptions, +} from '@fedify/fedify'; +import type { Logger } from '@logtape/logtape'; + +export class GCloudPubSubMessageQueue implements MessageQueue { + private topic: Topic; + private subscription: Subscription; + private logger: Logger; + + constructor(topic: Topic, subscription: Subscription, logger: Logger) { + this.topic = topic; + this.subscription = subscription; + this.logger = logger; + } + + async enqueue( + message: any, + options?: MessageQueueEnqueueOptions, + ): Promise { + const delay = options?.delay?.total('millisecond') ?? 0; + + this.logger.info( + `Enqueuing message [FedifyID: ${message.id}] with delay: ${delay}ms`, + ); + + setTimeout(() => { + this.topic + .publishMessage({ + json: message, + attributes: { + fedifyID: message.id, + }, + }) + .then((messageId) => { + this.logger.info( + `Message [FedifyID: ${message.id}] was enqueued [PubSubID: ${messageId}]`, + ); + }); + }, delay); + } + + async listen( + handler: (message: any) => Promise | void, + options: MessageQueueListenOptions = {}, + ): Promise { + this.subscription.on('message', async (message: Message) => { + const fedifyID = message.attributes.fedifyID ?? 'unknown'; + + this.logger.info( + `Handling message [FedifyID: ${fedifyID}, PubSubID: ${message.id}]`, + ); + + try { + const json = JSON.parse(message.data.toString()); + + await handler(json); + + message.ack(); + + this.logger.info( + `Acknowledged message [FedifyID: ${fedifyID}, PubSubID: ${message.id}]`, + ); + } catch (error) { + this.logger.error( + `Failed to handle message [FedifyID: ${fedifyID}, PubSubID: ${message.id}]: ${error}`, + ); + } + }); + + return await new Promise((resolve) => { + options.signal?.addEventListener('abort', () => { + this.subscription.removeAllListeners(); + + this.subscription.close().then(() => resolve()); + }); + }); + } +} diff --git a/src/helpers/gcloud-pubsub.ts b/src/helpers/gcloud-pubsub.ts new file mode 100644 index 00000000..b591ea3b --- /dev/null +++ b/src/helpers/gcloud-pubsub.ts @@ -0,0 +1,54 @@ +import { PubSub, type Subscription, type Topic } from '@google-cloud/pubsub'; + +const TOPIC_FEDIFY = 'fedify'; + +export function generateSubscriptionName() { + // TODO: Make this unique in case of multiple instances of the same service + return `${TOPIC_FEDIFY}_subscription`; +} + +export async function initPubSub() { + // Initialise pubsub client + const client = new PubSub({ + projectId: process.env.GCP_PROJECT_ID, + apiEndpoint: process.env.GCP_PUBSUB_HOST, + emulatorMode: process.env.NODE_ENV !== 'production', + }); + + // Initialise topic + const [topics] = await client.getTopics(); + + let topic: Topic | undefined = topics.find( + (topic) => + topic.name === + `projects/${process.env.GCP_PROJECT_ID}/topics/${TOPIC_FEDIFY}`, + ); + + if (topic === undefined) { + [topic] = await client.createTopic(TOPIC_FEDIFY); + } + + // Initialise subscription + const subscriptionName = generateSubscriptionName(); + let subscription: Subscription | undefined; + + const [subscriptions] = await topic.getSubscriptions(); + + for (const _subscription of subscriptions) { + const [metadata] = await _subscription.getMetadata(); + + if ( + metadata.name === + `projects/${process.env.GCP_PROJECT_ID}/subscriptions/${subscriptionName}` + ) { + subscription = _subscription; + } + } + + if (subscription === undefined) { + [subscription] = await topic.createSubscription(subscriptionName); + } + + // Return topic and subscription + return { topic, subscription }; +} diff --git a/src/helpers/gcloud-pubsub.unit.test.ts b/src/helpers/gcloud-pubsub.unit.test.ts new file mode 100644 index 00000000..3226fc1e --- /dev/null +++ b/src/helpers/gcloud-pubsub.unit.test.ts @@ -0,0 +1,13 @@ +import { describe, expect, it } from 'vitest'; + +import { generateSubscriptionName } from './gcloud-pubsub'; + +describe('gcloud-pubsub', () => { + describe('generateSubscriptionName', () => { + it('should generate a unique subscription name', () => { + const result = generateSubscriptionName(); + + expect(result).toBe('fedify_subscription'); + }); + }); +});