Skip to content

Commit

Permalink
Refactored GCloud Pub/Sub message queue implementation
Browse files Browse the repository at this point in the history
refs [AP-566](https://linear.app/ghost/issue/AP-566/implement-a-pubsub-backed-queue-for-fedify), [#143](#143)

Refactorings include:
- Renamed `gcloud-pubsub-mq` to `gcloud-pubsub-push-mq`
- Colocated all message queue related logic in `src/mq/gcloud-pubsub-push` for better reasonability / maintainability
- Reworked how the Hono middleware interfaces with the message queue (removed app-wide event bus)
- Reworked message queue error handling to not use Sentry directly
- Improved strcutured logging
- Improved test coverage
- Added inline documentation
  • Loading branch information
mike182uk committed Nov 16, 2024
1 parent 530121c commit 6c5053f
Show file tree
Hide file tree
Showing 11 changed files with 940 additions and 812 deletions.
54 changes: 0 additions & 54 deletions src/api/action/mq/handleMessage.ts

This file was deleted.

1 change: 0 additions & 1 deletion src/api/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
export { getActivitiesAction } from './action/getActivities';
export { getActivityThreadAction } from './action/getActivityThread';
export { handleMessageAction } from './action/mq/handleMessage';
export { profileGetAction } from './action/profile/get';
export { profileGetFollowersAction } from './action/profile/getFollowers';
export { profileGetFollowingAction } from './action/profile/getFollowing';
Expand Down
57 changes: 19 additions & 38 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import './instrumentation';

import { AsyncLocalStorage } from 'node:async_hooks';
import { createHmac } from 'node:crypto';
import { EventEmitter } from 'node:events';
import {
Accept,
Announce,
Expand All @@ -12,7 +11,6 @@ import {
Follow,
type KvStore,
Like,
type MessageQueue,
Note,
Undo,
Update,
Expand Down Expand Up @@ -40,7 +38,6 @@ import { behindProxy } from 'x-forwarded-fetch';
import {
getActivitiesAction,
getActivityThreadAction,
handleMessageAction,
profileGetAction,
profileGetFollowersAction,
profileGetFollowingAction,
Expand Down Expand Up @@ -78,10 +75,6 @@ import {
undoDispatcher,
updateDispatcher,
} from './dispatchers';
import { KnexKvStore } from './knex.kvstore';
import { scopeKvStore } from './kv-helpers';

import { EVENT_MQ_MESSAGE_RECEIVED } from './constants';
import {
followAction,
inboxHandler,
Expand All @@ -92,9 +85,15 @@ import {
unlikeAction,
} from './handlers';
import { getTraceAndSpanId } from './helpers/context-header';
import { initGCloudPubSubMessageQueue } from './helpers/gcloud-pubsub-mq';
import { getRequestData } from './helpers/request-data';
import { spanWrapper } from './instrumentation';
import { KnexKvStore } from './knex.kvstore';
import { scopeKvStore } from './kv-helpers';
import {
GCloudPubSubPushMessageQueue,
createMessageQueue,
handlePushMessage,
} from './mq/glcoud-pubsub-push/mq';

const logging = getLogger(['activitypub']);

Expand Down Expand Up @@ -157,34 +156,21 @@ export type ContextData = {

const fedifyKv = await KnexKvStore.create(client, 'key_value');

const eventBus = new EventEmitter();

let queue: MessageQueue | undefined;
let queue: GCloudPubSubPushMessageQueue | undefined;

if (process.env.USE_MQ === 'true') {
logging.info('Message queue is enabled');

try {
const host = String(process.env.MQ_PUBSUB_HOST) || undefined;
const emulatorMode = process.env.NODE_ENV !== 'production';
const projectId = String(process.env.MQ_PUBSUB_PROJECT_ID) || undefined;
const topicName = String(process.env.MQ_PUBSUB_TOPIC_NAME);
const subscriptionName = String(
process.env.MQ_PUBSUB_SUBSCRIPTION_NAME,
);
queue = await createMessageQueue(logging, {
pubSubHost: String(process.env.MQ_PUBSUB_HOST) || undefined,
hostIsEmulator: process.env.NODE_ENV !== 'production',
projectId: String(process.env.MQ_PUBSUB_PROJECT_ID) || undefined,
topic: String(process.env.MQ_PUBSUB_TOPIC_NAME),
subscription: String(process.env.MQ_PUBSUB_SUBSCRIPTION_NAME),
});

queue = await initGCloudPubSubMessageQueue(
logging,
eventBus,
EVENT_MQ_MESSAGE_RECEIVED,
{
host,
emulatorMode,
projectId,
topicName,
subscriptionName,
},
);
queue.registerErrorListener((error) => Sentry.captureException(error));
} catch (err) {
logging.error('Failed to initialise message queue {error}', {
error: err,
Expand Down Expand Up @@ -361,7 +347,6 @@ export type HonoContextVariables = {
host: string;
webhook_secret: string;
};
eventBus: EventEmitter;
};

const app = new Hono<{ Variables: HonoContextVariables }>();
Expand All @@ -374,12 +359,6 @@ app.get('/ping', (ctx) => {

/** Middleware */

app.use(async (ctx, next) => {
ctx.set('eventBus', eventBus);

return next();
});

app.use(async (ctx, next) => {
const extra: Record<string, string> = {};

Expand Down Expand Up @@ -559,7 +538,9 @@ app.use(async (ctx, next) => {

// This needs to go before the middleware which loads the site
// because this endpoint does not require the site to exist
app.post('/.ghost/activitypub/mq', spanWrapper(handleMessageAction));
if (queue instanceof GCloudPubSubPushMessageQueue) {
app.post('/.ghost/activitypub/mq', spanWrapper(handlePushMessage(queue)));
}

// This needs to go before the middleware which loads the site
// Because the site doesn't always exist - this is how it's created
Expand Down
2 changes: 0 additions & 2 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,3 @@ export const FOLLOWERS_PAGE_SIZE = 100;
export const FOLLOWING_PAGE_SIZE = 100;
export const LIKED_PAGE_SIZE = 100;
export const OUTBOX_PAGE_SIZE = 100;

export const EVENT_MQ_MESSAGE_RECEIVED = 'mq.message_received';
41 changes: 0 additions & 41 deletions src/events/mq-message-received-event.ts

This file was deleted.

99 changes: 0 additions & 99 deletions src/helpers/gcloud-pubsub-mq.ts

This file was deleted.

Loading

0 comments on commit 6c5053f

Please sign in to comment.