Skip to content

Commit

Permalink
Added GCloud Pub/Sub message queue implementation
Browse files Browse the repository at this point in the history
refs [AP-566](https://linear.app/ghost/issue/AP-566/implement-a-pubsub-backed-queue-for-fedify)

Added and configured a GCloud Pub/Sub message queue implementation for Fedify
  • Loading branch information
mike182uk committed Nov 11, 2024
1 parent b694767 commit 47e5a54
Show file tree
Hide file tree
Showing 7 changed files with 572 additions and 2 deletions.
26 changes: 24 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ services:
- NODE_ENV=testing
- ALLOW_PRIVATE_ADDRESS=true
- SKIP_SIGNATURE_VERIFICATION=true
- USE_MQ=true
- MQ_PUBSUB_PROJECT_ID=activitypub
- MQ_PUBSUB_HOST=pubsub:8085
- MQ_PUBSUB_TOPIC_NAME=activitypub_topic
- MQ_PUBSUB_SUBSCRIPTION_NAME=activitypub_subscription
command: yarn build:watch
depends_on:
migrate:
Expand Down Expand Up @@ -74,7 +79,13 @@ services:
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:499.0.0-emulators
ports:
- "8085:8085"
command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=activitypub
command: /bin/bash -c "/opt/activitypub/start-pubsub.sh"
volumes:
- ./pubsub/start.sh:/opt/activitypub/start-pubsub.sh
environment:
- PROJECT_ID=activitypub
- TOPIC_NAME=activitypub_topic
- SUBSCRIPTION_NAME=activitypub_subscription
healthcheck:
test: "curl -f http://localhost:8085 || exit 1"
interval: 1s
Expand All @@ -98,6 +109,11 @@ services:
- SKIP_SIGNATURE_VERIFICATION=true
- ALLOW_PRIVATE_ADDRESS=true
- NODE_TLS_REJECT_UNAUTHORIZED=0
- USE_MQ=true
- MQ_PUBSUB_PROJECT_ID=activitypub
- MQ_PUBSUB_HOST=pubsub-testing:8085
- MQ_PUBSUB_TOPIC_NAME=activitypub_topic
- MQ_PUBSUB_SUBSCRIPTION_NAME=activitypub_subscription
command: yarn build:watch
depends_on:
mysql-testing:
Expand Down Expand Up @@ -157,7 +173,13 @@ services:
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:499.0.0-emulators
ports:
- "8086:8085"
command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=activitypub
command: /bin/bash -c "/opt/activitypub/start-pubsub.sh"
volumes:
- ./pubsub/start.sh:/opt/activitypub/start-pubsub.sh
environment:
- PROJECT_ID=activitypub
- TOPIC_NAME=activitypub_topic
- SUBSCRIPTION_NAME=activitypub_subscription
healthcheck:
test: "curl -f http://localhost:8085 || exit 1"
interval: 1s
Expand Down
46 changes: 46 additions & 0 deletions pubsub/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/bin/bash

# This script is used to start the Pub/Sub emulator and create the required
# topic and subscription (defined in the environment variables) upfront
#
# See:
# https://cloud.google.com/pubsub/docs/emulator
# https://cloud.google.com/pubsub/docs/create-topic#pubsub_create_topic-rest
# https://cloud.google.com/pubsub/docs/create-push-subscription#pubsub_create_push_subscription-rest

# Ensure we explicitly set the host to 0.0.0.0:8085 so that the emulator will
# listen on all ip addresses and not just IPv6 which is the default
HOST=0.0.0.0:8085

# Start the emulator
gcloud beta emulators pubsub start --host-port=${HOST} --project=${PROJECT_ID} &

# Wait for the emulator to be ready
until curl -f http://${HOST}; do
echo "Waiting for Pub/Sub emulator to start..."

sleep 1
done

# Create the topic via REST API
if curl -s -o /dev/null -w "%{http_code}" -X PUT http://${HOST}/v1/projects/${PROJECT_ID}/topics/${TOPIC_NAME} | grep -q "200"; then
echo "Topic created: ${TOPIC_NAME}"
else
echo "Failed to create topic: ${TOPIC_NAME}"
exit 1
fi

# Create the subscription via REST API
if curl -s -o /dev/null -w "%{http_code}" -X PUT http://${HOST}/v1/projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_NAME} \
-H "Content-Type: application/json" \
-d '{
"topic": "projects/'${PROJECT_ID}'/topics/'${TOPIC_NAME}'"
}' | grep -q "200"; then
echo "Subscription created: ${SUBSCRIPTION_NAME}"
else
echo "Failed to create subscription: ${SUBSCRIPTION_NAME}"
exit 1
fi

# Keep the container running
tail -f /dev/null
57 changes: 57 additions & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
Follow,
type KvStore,
Like,
type MessageQueue,
Note,
Undo,
Update,
Expand Down Expand Up @@ -75,6 +76,7 @@ import {
undoDispatcher,
updateDispatcher,
} from './dispatchers';
import { GCloudPubSubMessageQueue } from './fedify/mq/gcloud-pubsub-mq';
import { KnexKvStore } from './knex.kvstore';
import { scopeKvStore } from './kv-helpers';

Expand All @@ -88,6 +90,13 @@ import {
unlikeAction,
} from './handlers';

import { PubSub } from '@google-cloud/pubsub';
import {
getFullSubscriptionIdentifier,
getFullTopicIdentifier,
subscriptionExists,
topicExists,
} from 'helpers/gcloud-pubsub';
import { getTraceAndSpanId } from './helpers/context-header';
import { getRequestData } from './helpers/request-data';
import { spanWrapper } from './instrumentation';
Expand Down Expand Up @@ -153,8 +162,56 @@ export type ContextData = {

const fedifyKv = await KnexKvStore.create(client, 'key_value');

let messageQueue: MessageQueue | undefined;

if (process.env.USE_MQ === 'true') {
logging.info('Message queue is enabled');

try {
const pubSubClient = new PubSub({
projectId: process.env.MQ_PUBSUB_PROJECT_ID,
apiEndpoint: process.env.MQ_PUBSUB_HOST,
emulatorMode: process.env.NODE_ENV !== 'production',
});

const topicName = process.env.MQ_PUBSUB_TOPIC_NAME ?? 'unknown_topic';
const subscriptionName =
process.env.MQ_PUBSUB_SUBSCRIPTION_NAME ?? 'unknown_subscription';

const topicIdentifier = getFullTopicIdentifier(pubSubClient, topicName);
const subscriptionIdentifier = getFullSubscriptionIdentifier(
pubSubClient,
subscriptionName,
);

if (!(await topicExists(pubSubClient, topicIdentifier))) {
throw new Error(`Topic does not exist: ${topicName}`);
}

if (!(await subscriptionExists(pubSubClient, subscriptionIdentifier))) {
throw new Error(`Subscription does not exist: ${subscriptionName}`);
}

messageQueue = new GCloudPubSubMessageQueue(
pubSubClient,
topicIdentifier,
subscriptionIdentifier,
logging,
);
} catch (err) {
logging.error('Failed to initialise message queue {error}', {
error: err,
});

process.exit(1);
}
} else {
logging.info('Message queue is disabled');
}

export const fedify = createFederation<ContextData>({
kv: fedifyKv,
queue: messageQueue,
skipSignatureVerification:
process.env.SKIP_SIGNATURE_VERIFICATION === 'true' &&
process.env.NODE_ENV === 'testing',
Expand Down
105 changes: 105 additions & 0 deletions src/fedify/mq/gcloud-pubsub-mq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import type { Message, PubSub } from '@google-cloud/pubsub';

import type {
MessageQueue,
MessageQueueEnqueueOptions,
MessageQueueListenOptions,
} from '@fedify/fedify';
import type { Logger } from '@logtape/logtape';

export class GCloudPubSubMessageQueue implements MessageQueue {
private pubSubClient: PubSub;
private topicIdentifier: string;
private subscriptionIdentifier: string;
private logger: Logger;

constructor(
pubSubClient: PubSub,
topicIdentifier: string,
subscriptionIdentifier: string,
logger: Logger,
) {
this.pubSubClient = pubSubClient;
this.topicIdentifier = topicIdentifier;
this.subscriptionIdentifier = subscriptionIdentifier;
this.logger = logger;
}

async enqueue(
message: any,
options?: MessageQueueEnqueueOptions,
): Promise<void> {
const delay = options?.delay?.total('millisecond') ?? 0;

this.logger.info(
`Enqueuing message [FedifyID: ${message.id}] with delay: ${delay}ms`,
);

if (delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay));
}

try {
const messageId = await this.pubSubClient
.topic(this.topicIdentifier)
.publishMessage({
json: message,
attributes: {
fedifyId: message.id,
},
});

this.logger.info(
`Message [FedifyID: ${message.id}] was enqueued [PubSubID: ${messageId}]`,
);
} catch (error) {
this.logger.error(
`Failed to enqueue message [FedifyID: ${message.id}]: ${error}`,
);
}
}

async listen(
handler: (message: any) => Promise<void> | void,
options: MessageQueueListenOptions = {},
): Promise<void> {
const subscription = this.pubSubClient.subscription(
this.subscriptionIdentifier,
);

subscription.on('message', async (message: Message) => {
const fedifyId = message.attributes.fedifyId ?? 'unknown';

this.logger.info(
`Handling message [FedifyID: ${fedifyId}, PubSubID: ${message.id}]`,
);

try {
const json = JSON.parse(message.data.toString());

await handler(json);

message.ack();

this.logger.info(
`Acknowledged message [FedifyID: ${fedifyId}, PubSubID: ${message.id}]`,
);
} catch (error) {
message.nack();

this.logger.error(
`Failed to handle message [FedifyID: ${fedifyId}, PubSubID: ${message.id}]: ${error}`,
);
}
});

return await new Promise((resolve) => {
options.signal?.addEventListener('abort', () => {
subscription
.removeAllListeners()
.close()
.then(() => resolve());
});
});
}
}
Loading

0 comments on commit 47e5a54

Please sign in to comment.