Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mike182uk committed Nov 11, 2024
1 parent b694767 commit 8ebc64c
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 0 deletions.
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ services:
- NODE_ENV=testing
- ALLOW_PRIVATE_ADDRESS=true
- SKIP_SIGNATURE_VERIFICATION=true
- GCP_PROJECT_ID=activitypub
- GCP_PUBSUB_HOST=pubsub:8085
command: yarn build:watch
depends_on:
migrate:
Expand Down Expand Up @@ -98,6 +100,8 @@ services:
- SKIP_SIGNATURE_VERIFICATION=true
- ALLOW_PRIVATE_ADDRESS=true
- NODE_TLS_REJECT_UNAUTHORIZED=0
- GCP_PROJECT_ID=activitypub
- GCP_PUBSUB_HOST=pubsub-testing:8085
command: yarn build:watch
depends_on:
mysql-testing:
Expand Down
14 changes: 14 additions & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
Follow,
type KvStore,
Like,
type MessageQueue,
Note,
Undo,
Update,
Expand Down Expand Up @@ -75,6 +76,7 @@ import {
undoDispatcher,
updateDispatcher,
} from './dispatchers';
import { GCloudPubSubMessageQueue } from './fedify/mq/gcloud-pubsub-mq';
import { KnexKvStore } from './knex.kvstore';
import { scopeKvStore } from './kv-helpers';

Expand All @@ -89,6 +91,7 @@ import {
} from './handlers';

import { getTraceAndSpanId } from './helpers/context-header';
import { initPubSub } from './helpers/gcloud-pubsub';
import { getRequestData } from './helpers/request-data';
import { spanWrapper } from './instrumentation';

Expand Down Expand Up @@ -153,8 +156,19 @@ export type ContextData = {

const fedifyKv = await KnexKvStore.create(client, 'key_value');

let messageQueue: MessageQueue | undefined;

try {
const { topic, subscription } = await initPubSub();

messageQueue = new GCloudPubSubMessageQueue(topic, subscription, logging);
} catch (err) {
logging.error('Failed to initialise message queue {error}', { error: err });
}

export const fedify = createFederation<ContextData>({
kv: fedifyKv,
queue: messageQueue,
skipSignatureVerification:
process.env.SKIP_SIGNATURE_VERIFICATION === 'true' &&
process.env.NODE_ENV === 'testing',
Expand Down
83 changes: 83 additions & 0 deletions src/fedify/mq/gcloud-pubsub-mq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import type { Message, Subscription, Topic } from '@google-cloud/pubsub';

import type {
MessageQueue,
MessageQueueEnqueueOptions,
MessageQueueListenOptions,
} from '@fedify/fedify';
import type { Logger } from '@logtape/logtape';

export class GCloudPubSubMessageQueue implements MessageQueue {
private topic: Topic;
private subscription: Subscription;
private logger: Logger;

constructor(topic: Topic, subscription: Subscription, logger: Logger) {
this.topic = topic;
this.subscription = subscription;
this.logger = logger;
}

async enqueue(
message: any,
options?: MessageQueueEnqueueOptions,
): Promise<void> {
const delay = options?.delay?.total('millisecond') ?? 0;

this.logger.info(
`Enqueuing message [FedifyID: ${message.id}] with delay: ${delay}ms`,
);

setTimeout(() => {
this.topic
.publishMessage({
json: message,
attributes: {
fedifyID: message.id,
},
})
.then((messageId) => {
this.logger.info(
`Message [FedifyID: ${message.id}] was enqueued [PubSubID: ${messageId}]`,
);
});
}, delay);
}

async listen(
handler: (message: any) => Promise<void> | void,
options: MessageQueueListenOptions = {},
): Promise<void> {
this.subscription.on('message', async (message: Message) => {
const fedifyID = message.attributes.fedifyID ?? 'unknown';

this.logger.info(
`Handling message [FedifyID: ${fedifyID}, PubSubID: ${message.id}]`,
);

try {
const json = JSON.parse(message.data.toString());

await handler(json);

message.ack();

this.logger.info(
`Acknowledged message [FedifyID: ${fedifyID}, PubSubID: ${message.id}]`,
);
} catch (error) {
this.logger.error(
`Failed to handle message [FedifyID: ${fedifyID}, PubSubID: ${message.id}]: ${error}`,
);
}
});

return await new Promise((resolve) => {
options.signal?.addEventListener('abort', () => {
this.subscription.removeAllListeners();

this.subscription.close().then(() => resolve());
});
});
}
}
54 changes: 54 additions & 0 deletions src/helpers/gcloud-pubsub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { PubSub, type Subscription, type Topic } from '@google-cloud/pubsub';

const TOPIC_FEDIFY = 'fedify';

export function generateSubscriptionName() {
// TODO: Make this unique in case of multiple instances of the same service
return `${TOPIC_FEDIFY}_subscription`;
}

export async function initPubSub() {
// Initialise pubsub client
const client = new PubSub({
projectId: process.env.GCP_PROJECT_ID,
apiEndpoint: process.env.GCP_PUBSUB_HOST,
emulatorMode: process.env.NODE_ENV !== 'production',
});

// Initialise topic
const [topics] = await client.getTopics();

let topic: Topic | undefined = topics.find(
(topic) =>
topic.name ===
`projects/${process.env.GCP_PROJECT_ID}/topics/${TOPIC_FEDIFY}`,
);

if (topic === undefined) {
[topic] = await client.createTopic(TOPIC_FEDIFY);
}

// Initialise subscription
const subscriptionName = generateSubscriptionName();
let subscription: Subscription | undefined;

const [subscriptions] = await topic.getSubscriptions();

for (const _subscription of subscriptions) {
const [metadata] = await _subscription.getMetadata();

if (
metadata.name ===
`projects/${process.env.GCP_PROJECT_ID}/subscriptions/${subscriptionName}`
) {
subscription = _subscription;
}
}

if (subscription === undefined) {
[subscription] = await topic.createSubscription(subscriptionName);
}

// Return topic and subscription
return { topic, subscription };
}
13 changes: 13 additions & 0 deletions src/helpers/gcloud-pubsub.unit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { describe, expect, it } from 'vitest';

import { generateSubscriptionName } from './gcloud-pubsub';

describe('gcloud-pubsub', () => {
describe('generateSubscriptionName', () => {
it('should generate a unique subscription name', () => {
const result = generateSubscriptionName();

expect(result).toBe('fedify_subscription');
});
});
});

0 comments on commit 8ebc64c

Please sign in to comment.