Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mike182uk committed Nov 14, 2024
1 parent 0e085fe commit 0a8a892
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 110 deletions.
54 changes: 0 additions & 54 deletions src/api/action/mq/handleMessage.ts

This file was deleted.

1 change: 0 additions & 1 deletion src/api/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
export { getActivitiesAction } from './action/getActivities';
export { getActivityThreadAction } from './action/getActivityThread';
export { handleMessageAction } from './action/mq/handleMessage';
export { profileGetAction } from './action/profile/get';
export { profileGetFollowersAction } from './action/profile/getFollowers';
export { profileGetFollowingAction } from './action/profile/getFollowing';
Expand Down
41 changes: 14 additions & 27 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import './instrumentation';

import { AsyncLocalStorage } from 'node:async_hooks';
import { createHmac } from 'node:crypto';
import { EventEmitter } from 'node:events';
import {
Accept,
Announce,
Expand Down Expand Up @@ -40,7 +39,6 @@ import { behindProxy } from 'x-forwarded-fetch';
import {
getActivitiesAction,
getActivityThreadAction,
handleMessageAction,
profileGetAction,
profileGetFollowersAction,
profileGetFollowingAction,
Expand Down Expand Up @@ -81,7 +79,6 @@ import {
import { KnexKvStore } from './knex.kvstore';
import { scopeKvStore } from './kv-helpers';

import { EVENT_MQ_MESSAGE_RECEIVED } from './constants';
import {
followAction,
inboxHandler,
Expand All @@ -95,6 +92,7 @@ import { getTraceAndSpanId } from './helpers/context-header';
import { initGCloudPubSubMessageQueue } from './helpers/gcloud-pubsub-mq';
import { getRequestData } from './helpers/request-data';
import { spanWrapper } from './instrumentation';
import { GCloudPubSubMessageQueue } from './mq/gcloud-pubsub-mq';

const logging = getLogger(['activitypub']);

Expand Down Expand Up @@ -157,28 +155,19 @@ export type ContextData = {

const fedifyKv = await KnexKvStore.create(client, 'key_value');

const eventBus = new EventEmitter();

let queue: MessageQueue | undefined;

if (process.env.USE_MQ === 'true') {
logging.info('Message queue is enabled');

try {
queue = await initGCloudPubSubMessageQueue(
logging,
eventBus,
EVENT_MQ_MESSAGE_RECEIVED,
{
host: String(process.env.MQ_PUBSUB_HOST),
emulatorMode: process.env.NODE_ENV !== 'production',
projectId: String(process.env.MQ_PUBSUB_PROJECT_ID),
topicName: String(process.env.MQ_PUBSUB_TOPIC_NAME),
subscriptionName: String(
process.env.MQ_PUBSUB_SUBSCRIPTION_NAME,
),
},
);
queue = await initGCloudPubSubMessageQueue(logging, {
host: String(process.env.MQ_PUBSUB_HOST),
emulatorMode: process.env.NODE_ENV !== 'production',
projectId: String(process.env.MQ_PUBSUB_PROJECT_ID),
topicName: String(process.env.MQ_PUBSUB_TOPIC_NAME),
subscriptionName: String(process.env.MQ_PUBSUB_SUBSCRIPTION_NAME),
});
} catch (err) {
logging.error('Failed to initialise message queue {error}', {
error: err,
Expand Down Expand Up @@ -355,7 +344,6 @@ export type HonoContextVariables = {
host: string;
webhook_secret: string;
};
eventBus: EventEmitter;
};

const app = new Hono<{ Variables: HonoContextVariables }>();
Expand All @@ -368,12 +356,6 @@ app.get('/ping', (ctx) => {

/** Middleware */

app.use(async (ctx, next) => {
ctx.set('eventBus', eventBus);

return next();
});

app.use(async (ctx, next) => {
const extra: Record<string, string> = {};

Expand Down Expand Up @@ -553,7 +535,12 @@ app.use(async (ctx, next) => {

// This needs to go before the middleware which loads the site
// because this endpoint does not require the site to exist
app.post('/.ghost/activitypub/mq', spanWrapper(handleMessageAction));
if (queue instanceof GCloudPubSubMessageQueue) {
app.post(
'/.ghost/activitypub/mq',
spanWrapper(queue.handlePush.bind(queue)),
);
}

// This needs to go before the middleware which loads the site
// Because the site doesn't always exist - this is how it's created
Expand Down
2 changes: 0 additions & 2 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,3 @@ export const FOLLOWERS_PAGE_SIZE = 100;
export const FOLLOWING_PAGE_SIZE = 100;
export const LIKED_PAGE_SIZE = 100;
export const OUTBOX_PAGE_SIZE = 100;

export const EVENT_MQ_MESSAGE_RECEIVED = 'mq.message_received';
5 changes: 0 additions & 5 deletions src/helpers/gcloud-pubsub-mq.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { EventEmitter } from 'node:events';
import { PubSub } from '@google-cloud/pubsub';
import type { Logger } from '@logtape/logtape';

Expand Down Expand Up @@ -45,8 +44,6 @@ type InitGCloudPubSubMessageQueueOptions = {

export async function initGCloudPubSubMessageQueue(
logger: Logger,
eventBus: EventEmitter,
messageReceivedEventName: string,
{
host,
emulatorMode,
Expand Down Expand Up @@ -77,10 +74,8 @@ export async function initGCloudPubSubMessageQueue(

return new GCloudPubSubMessageQueue(
pubSubClient,
eventBus,
logger,
topicIdentifier,
subscriptionIdentifier,
messageReceivedEventName,
);
}
62 changes: 45 additions & 17 deletions src/mq/gcloud-pubsub-mq.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { EventEmitter } from 'node:events';
import type {
MessageQueue,
MessageQueueEnqueueOptions,
Expand All @@ -7,31 +6,27 @@ import type {
import type { PubSub } from '@google-cloud/pubsub';
import type { Logger } from '@logtape/logtape';
import * as Sentry from '@sentry/node';
import type { Context } from 'hono';

import type { MqMessageReceivedEvent } from '../events/mq-message-received-event';
import { Message } from './message';

export class GCloudPubSubMessageQueue implements MessageQueue {
private pubSubClient: PubSub;
private eventBus: EventEmitter;
private logger: Logger;
private topicIdentifier: string;
private subscriptionIdentifier: string;
private messageReceivedEventName: string;
private messageHandler: ((message: Message) => void) | null = null;

constructor(
pubSubClient: PubSub,
eventBus: EventEmitter,
logger: Logger,
topicIdentifier: string,
subscriptionIdentifier: string,
messageReceivedEventName: string,
) {
this.pubSubClient = pubSubClient;
this.eventBus = eventBus;
this.logger = logger;
this.topicIdentifier = topicIdentifier;
this.subscriptionIdentifier = subscriptionIdentifier;
this.messageReceivedEventName = messageReceivedEventName;
}

async enqueue(
Expand Down Expand Up @@ -78,26 +73,59 @@ export class GCloudPubSubMessageQueue implements MessageQueue {
handler: (message: any) => Promise<void> | void,
options: MessageQueueListenOptions = {},
): Promise<void> {
const messageHandler = (message: MqMessageReceivedEvent) => {
this.messageHandler = (message: Message) => {
this.handleMessage(message, handler);
};

this.eventBus.on(this.messageReceivedEventName, messageHandler);

return await new Promise((resolve) => {
options.signal?.addEventListener('abort', () => {
this.eventBus.removeListener(
this.messageReceivedEventName,
messageHandler,
);

resolve();
});
});
}

async handlePush(ctx: Context) {
const json = await ctx.req.json();
const pubSubId = json?.message?.message_id ?? 'unknown';

if (this.messageHandler === null) {
this.logger.info(
`Message handler is not initialised, nacking incoming message [PubSub ID: ${pubSubId}]`,
);

return new Response(null, { status: 429 });
}

return await new Promise<Response>((resolve) => {
let data = {};

try {
data = JSON.parse(
Buffer.from(json.message.data, 'base64').toString(),
);
} catch (error) {
this.logger.error(
`Failed to parse message data [PubSub ID: ${pubSubId}]: ${error}`,
);

return resolve(new Response(null, { status: 500 }));
}

this.messageHandler?.(
new Message({
id: json.message.message_id,
subscriptionIdentifier: json.subscription,
data,
attributes: json.message.attributes,
onAck: () => resolve(new Response(null, { status: 200 })),
onNack: () => resolve(new Response(null, { status: 500 })),
}),
);
});
}

private async handleMessage(
message: MqMessageReceivedEvent,
message: Message,
handler: (message: any) => Promise<void> | void,
) {
const fedifyId = message.attributes.fedifyId ?? 'unknown';
Expand Down
2 changes: 1 addition & 1 deletion src/mq/gcloud-pubsub-mq.unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { Temporal } from '@js-temporal/polyfill';
import type { Logger } from '@logtape/logtape';
import { type Mock, beforeEach, describe, expect, it, vi } from 'vitest';

import { MqMessageReceivedEvent } from '../events/mq-message-received-event';
import { GCloudPubSubMessageQueue } from './gcloud-pubsub-mq';
import { MqMessageReceivedEvent } from './message';

const TOPIC_IDENTIFIER = 'topic';
const SUBSCRIPTION_IDENTIFIER = 'subscription';
Expand Down
6 changes: 3 additions & 3 deletions src/events/mq-message-received-event.ts → src/mq/message.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
interface MqMessageReceivedEventOptions {
interface MessageOptions {
id: string;
subscriptionIdentifier: string;
data: Record<string, unknown>;
Expand All @@ -7,7 +7,7 @@ interface MqMessageReceivedEventOptions {
onNack: () => void;
}

export class MqMessageReceivedEvent {
export class Message {
readonly id: string;
readonly subscriptionIdentifier: string;
readonly data: Record<string, unknown>;
Expand All @@ -22,7 +22,7 @@ export class MqMessageReceivedEvent {
attributes,
onAck,
onNack,
}: MqMessageReceivedEventOptions) {
}: MessageOptions) {
this.id = id;
this.subscriptionIdentifier = subscriptionIdentifier;
this.data = data;
Expand Down

0 comments on commit 0a8a892

Please sign in to comment.