Skip to content

Commit

Permalink
Use push subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
mike182uk committed Nov 13, 2024
1 parent 5d9e4d0 commit ea77efd
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 145 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ services:
- PROJECT_ID=activitypub
- TOPIC_NAME=activitypub_topic_changeme
- SUBSCRIPTION_NAME=activitypub_subscription_changeme
- PUSH_ENDPOINT=http://activitypub:8080/.ghost/activitypub/mq
healthcheck:
test: "curl -f http://localhost:8085 || exit 1"
interval: 1s
Expand Down Expand Up @@ -180,6 +181,7 @@ services:
- PROJECT_ID=activitypub
- TOPIC_NAME=activitypub_topic_changeme
- SUBSCRIPTION_NAME=activitypub_subscription_changeme
- PUSH_ENDPOINT=http://activitypub-testing:8080/.ghost/activitypub/mq
healthcheck:
test: "curl -f http://localhost:8085 || exit 1"
interval: 1s
Expand Down
7 changes: 5 additions & 2 deletions pubsub/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ else
exit 1
fi

# Create the subscription via REST API
# Create the (push) subscription via REST API
if curl -s -o /dev/null -w "%{http_code}" -X PUT http://${HOST}/v1/projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_NAME} \
-H "Content-Type: application/json" \
-d '{
"topic": "projects/'${PROJECT_ID}'/topics/'${TOPIC_NAME}'"
"topic": "projects/'${PROJECT_ID}'/topics/'${TOPIC_NAME}'",
"pushConfig": {
"pushEndpoint": "'${PUSH_ENDPOINT}'"
}
}' | grep -q "200"; then
echo "Subscription created: ${SUBSCRIPTION_NAME}"
else
Expand Down
42 changes: 42 additions & 0 deletions src/api/action/mq/handleMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import type { Context } from 'hono';

import type { HonoContextVariables } from '../../../app';
import { EVENT_MQ_MESSAGE_RECEIVED } from '../../../constants';
import { MqMessageReceivedEvent } from '../../../events/mq-message-received-event';

export async function handleMessageAction(
ctx: Context<{ Variables: HonoContextVariables }>,
): Promise<Response> {
const logger = ctx.get('logger');
const eventBus = ctx.get('eventBus');

const json = await ctx.req.json();

// If no listeners are attached, we should not process the message
if (eventBus.listenerCount(EVENT_MQ_MESSAGE_RECEIVED) === 0) {
logger.info(
`No event listeners attached to [${EVENT_MQ_MESSAGE_RECEIVED}], nacking incoming message [PubSub ID: ${json.message.message_id}]`,
);

return new Response(null, { status: 429 });
}

// Return a promise that will eventually resolve or reject when the
// message is ack'd or nack'd
return await new Promise<Response>((resolve, reject) => {
const event = new MqMessageReceivedEvent({
id: json.message.message_id,
subscriptionIdentifier: json.subscription,
data: JSON.parse(
Buffer.from(json.message.data, 'base64').toString(),
),
attributes: json.message.attributes,
onAck: () => resolve(new Response(null, { status: 200 })),
onNack: () => reject(new Response(null, { status: 500 })),
});

eventBus.emit(EVENT_MQ_MESSAGE_RECEIVED, event);

// TODO: Handle timeout
});
}
1 change: 1 addition & 0 deletions src/api/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export { getActivitiesAction } from './action/getActivities';
export { getActivityThreadAction } from './action/getActivityThread';
export { handleMessageAction } from './action/mq/handleMessage';
export { profileGetAction } from './action/profile/get';
export { profileGetFollowersAction } from './action/profile/getFollowers';
export { profileGetFollowingAction } from './action/profile/getFollowing';
Expand Down
26 changes: 22 additions & 4 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import './instrumentation';

import { AsyncLocalStorage } from 'node:async_hooks';
import { createHmac } from 'node:crypto';
import { EventEmitter } from 'node:events';
import {
Accept,
Announce,
Expand Down Expand Up @@ -39,6 +40,7 @@ import { behindProxy } from 'x-forwarded-fetch';
import {
getActivitiesAction,
getActivityThreadAction,
handleMessageAction,
profileGetAction,
profileGetFollowersAction,
profileGetFollowingAction,
Expand Down Expand Up @@ -79,6 +81,7 @@ import {
import { KnexKvStore } from './knex.kvstore';
import { scopeKvStore } from './kv-helpers';

import { EVENT_MQ_MESSAGE_RECEIVED } from './constants';
import {
followAction,
inboxHandler,
Expand Down Expand Up @@ -154,13 +157,17 @@ export type ContextData = {

const fedifyKv = await KnexKvStore.create(client, 'key_value');

let messageQueue: MessageQueue | undefined;
const eventBus = new EventEmitter();

let queue: MessageQueue | undefined;

if (process.env.USE_MQ === 'true') {
logging.info('Message queue is enabled');

try {
messageQueue = await initGCloudPubSubMessageQueue(
queue = await initGCloudPubSubMessageQueue(
logging,
eventBus,
{
host: String(process.env.MQ_PUBSUB_HOST),
emulatorMode: process.env.NODE_ENV !== 'production',
Expand All @@ -170,7 +177,7 @@ if (process.env.USE_MQ === 'true') {
process.env.MQ_PUBSUB_SUBSCRIPTION_NAME,
),
},
logging,
EVENT_MQ_MESSAGE_RECEIVED,
);
} catch (err) {
logging.error('Failed to initialise message queue {error}', {
Expand All @@ -185,7 +192,7 @@ if (process.env.USE_MQ === 'true') {

export const fedify = createFederation<ContextData>({
kv: fedifyKv,
queue: messageQueue,
queue,
skipSignatureVerification:
process.env.SKIP_SIGNATURE_VERIFICATION === 'true' &&
['development', 'testing'].includes(process.env.NODE_ENV || ''),
Expand Down Expand Up @@ -348,6 +355,7 @@ export type HonoContextVariables = {
host: string;
webhook_secret: string;
};
eventBus: EventEmitter;
};

const app = new Hono<{ Variables: HonoContextVariables }>();
Expand All @@ -360,6 +368,12 @@ app.get('/ping', (ctx) => {

/** Middleware */

app.use(async (ctx, next) => {
ctx.set('eventBus', eventBus);

return next();
});

app.use(async (ctx, next) => {
const extra: Record<string, string> = {};

Expand Down Expand Up @@ -537,6 +551,10 @@ app.use(async (ctx, next) => {
await next();
});

// This needs to go before the middleware which loads the site
// because this endpoint does not require the site to exist
app.post('/.ghost/activitypub/mq', spanWrapper(handleMessageAction));

// This needs to go before the middleware which loads the site
// Because the site doesn't always exist - this is how it's created
app.get(
Expand Down
2 changes: 2 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ export const FOLLOWERS_PAGE_SIZE = 100;
export const FOLLOWING_PAGE_SIZE = 100;
export const LIKED_PAGE_SIZE = 100;
export const OUTBOX_PAGE_SIZE = 100;

export const EVENT_MQ_MESSAGE_RECEIVED = 'mq.message_received';
41 changes: 41 additions & 0 deletions src/events/mq-message-received-event.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
interface MqMessageReceivedEventOptions {
id: string;
subscriptionIdentifier: string;
data: string;
attributes: Record<string, string>;
onAck: () => void;
onNack: () => void;
}

export class MqMessageReceivedEvent {
readonly id: string;
readonly subscriptionIdentifier: string;
readonly data: string;
readonly attributes: Record<string, string>;
private onAck: () => void;
private onNack: () => void;

constructor({
id,
subscriptionIdentifier,
data,
attributes,
onAck,
onNack,
}: MqMessageReceivedEventOptions) {
this.id = id;
this.subscriptionIdentifier = subscriptionIdentifier;
this.data = data;
this.attributes = attributes;
this.onAck = onAck;
this.onNack = onNack;
}

ack() {
this.onAck();
}

nack() {
this.onNack();
}
}
12 changes: 9 additions & 3 deletions src/helpers/gcloud-pubsub-mq.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { EventEmitter } from 'node:events';
import { PubSub } from '@google-cloud/pubsub';
import type { Logger } from '@logtape/logtape';
import { GCloudPubSubMessageQueue } from '../fedify/mq/gcloud-pubsub-mq';

import { GCloudPubSubMessageQueue } from '../mq/gcloud-pubsub-mq';

export function getFullTopicIdentifier(
projectId: string,
Expand Down Expand Up @@ -42,14 +44,16 @@ type InitGCloudPubSubMessageQueueOptions = {
};

export async function initGCloudPubSubMessageQueue(
logger: Logger,
eventBus: EventEmitter,
{
host,
emulatorMode,
projectId,
topicName = 'unknown_topic',
subscriptionName = 'unknown_subscription',
}: InitGCloudPubSubMessageQueueOptions,
logger: Logger,
eventBusMqMessageReceivedName: string,
) {
const pubSubClient = new PubSub({
projectId,
Expand All @@ -73,8 +77,10 @@ export async function initGCloudPubSubMessageQueue(

return new GCloudPubSubMessageQueue(
pubSubClient,
eventBus,
logger,
topicIdentifier,
subscriptionIdentifier,
logger,
eventBusMqMessageReceivedName,
);
}
37 changes: 31 additions & 6 deletions src/helpers/gcloud-pubsub-mq.unit.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type { EventEmitter } from 'node:events';
import { PubSub, type Subscription, type Topic } from '@google-cloud/pubsub';
import type { Logger } from '@logtape/logtape';
import { type Mock, beforeEach, describe, expect, it, vi } from 'vitest';
import { GCloudPubSubMessageQueue } from '../fedify/mq/gcloud-pubsub-mq';
import { GCloudPubSubMessageQueue } from '../mq/gcloud-pubsub-mq';
import {
getFullSubscriptionIdentifier,
getFullTopicIdentifier,
Expand All @@ -14,20 +15,27 @@ vi.mock('@google-cloud/pubsub', () => {
};
});

vi.mock('../fedify/mq/gcloud-pubsub-mq', () => {
vi.mock('../mq/gcloud-pubsub-mq', () => {
return {
GCloudPubSubMessageQueue: vi.fn(),
};
});

const EVENT_BUS_EVENT_NAME = 'event';

describe('initGCloudPubSubMessageQueue', () => {
let mockLogger: Logger;
let mockEventBus: EventEmitter;

beforeEach(() => {
mockLogger = {
info: vi.fn(),
} as unknown as Logger;

mockEventBus = {
on: vi.fn(),
} as unknown as EventEmitter;

vi.resetAllMocks();
});

Expand Down Expand Up @@ -64,7 +72,12 @@ describe('initGCloudPubSubMessageQueue', () => {
});

await expect(
initGCloudPubSubMessageQueue(options, mockLogger),
initGCloudPubSubMessageQueue(
mockLogger,
mockEventBus,
options,
EVENT_BUS_EVENT_NAME,
),
).resolves.toBeInstanceOf(GCloudPubSubMessageQueue);

expect(PubSub).toHaveBeenCalledWith({
Expand All @@ -75,12 +88,14 @@ describe('initGCloudPubSubMessageQueue', () => {

expect(GCloudPubSubMessageQueue).toHaveBeenCalledWith(
expect.any(Object),
mockEventBus,
mockLogger,
getFullTopicIdentifier(options.projectId, options.topicName),
getFullSubscriptionIdentifier(
options.projectId,
options.subscriptionName,
),
mockLogger,
EVENT_BUS_EVENT_NAME,
);
});

Expand All @@ -101,7 +116,12 @@ describe('initGCloudPubSubMessageQueue', () => {
});

await expect(
initGCloudPubSubMessageQueue(options, mockLogger),
initGCloudPubSubMessageQueue(
mockLogger,
mockEventBus,
options,
EVENT_BUS_EVENT_NAME,
),
).rejects.toThrow(`Topic does not exist: ${options.topicName}`);
});

Expand Down Expand Up @@ -131,7 +151,12 @@ describe('initGCloudPubSubMessageQueue', () => {
});

await expect(
initGCloudPubSubMessageQueue(options, mockLogger),
initGCloudPubSubMessageQueue(
mockLogger,
mockEventBus,
options,
EVENT_BUS_EVENT_NAME,
),
).rejects.toThrow(
`Subscription does not exist: ${options.subscriptionName}`,
);
Expand Down
Loading

0 comments on commit ea77efd

Please sign in to comment.