Skip to content

Commit

Permalink
Use push subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
mike182uk committed Nov 13, 2024
1 parent 5d9e4d0 commit 011985f
Show file tree
Hide file tree
Showing 14 changed files with 531 additions and 374 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ services:
- PROJECT_ID=activitypub
- TOPIC_NAME=activitypub_topic_changeme
- SUBSCRIPTION_NAME=activitypub_subscription_changeme
- PUSH_ENDPOINT=http://activitypub-testing:8080/.ghost/activitypub/mq
healthcheck:
test: "curl -f http://localhost:8085 || exit 1"
interval: 1s
Expand Down Expand Up @@ -180,6 +181,7 @@ services:
- PROJECT_ID=activitypub
- TOPIC_NAME=activitypub_topic_changeme
- SUBSCRIPTION_NAME=activitypub_subscription_changeme
- PUSH_ENDPOINT=http://activitypub-testing:8083/.ghost/activitypub/mq
healthcheck:
test: "curl -f http://localhost:8085 || exit 1"
interval: 1s
Expand Down
4 changes: 2 additions & 2 deletions features/step_definitions/stepdefs.js
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ async function waitForInboxActivity(
delay: 0,
},
) {
const MAX_RETRIES = 5;
const MAX_RETRIES = 8;

const response = await fetchActivityPub(
'http://fake-ghost-activitypub/.ghost/activitypub/inbox/index',
Expand Down Expand Up @@ -737,7 +737,7 @@ async function waitForInboxActivity(

await waitForInboxActivity(activity, {
retryCount: options.retryCount + 1,
delay: options.delay + 100,
delay: options.delay + 500,
});
}

Expand Down
7 changes: 5 additions & 2 deletions pubsub/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ else
exit 1
fi

# Create the subscription via REST API
# Create the (push) subscription via REST API
if curl -s -o /dev/null -w "%{http_code}" -X PUT http://${HOST}/v1/projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_NAME} \
-H "Content-Type: application/json" \
-d '{
"topic": "projects/'${PROJECT_ID}'/topics/'${TOPIC_NAME}'"
"topic": "projects/'${PROJECT_ID}'/topics/'${TOPIC_NAME}'",
"pushConfig": {
"pushEndpoint": "'${PUSH_ENDPOINT}'"
}
}' | grep -q "200"; then
echo "Subscription created: ${SUBSCRIPTION_NAME}"
else
Expand Down
55 changes: 55 additions & 0 deletions src/api/action/mq/handleMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import type { Context } from 'hono';

import type { HonoContextVariables } from '../../../app';
import { EVENT_MQ_MESSAGE_RECEIVED } from '../../../constants';
import { MqMessageReceivedEvent } from '../../../events/mq-message-received-event';

export async function handleMessageAction(
ctx: Context<{ Variables: HonoContextVariables }>,
): Promise<Response> {
const logger = ctx.get('logger');
const eventBus = ctx.get('eventBus');

const json = await ctx.req.json();
const pubSubId = json?.message?.message_id ?? 'unknown';

// If no listeners are attached, we should not process the message
if (eventBus.listenerCount(EVENT_MQ_MESSAGE_RECEIVED) === 0) {
logger.info(
`No event listeners attached to [${EVENT_MQ_MESSAGE_RECEIVED}], nacking incoming message [PubSub ID: ${pubSubId}]`,
);

return new Response(null, { status: 429 });
}

// Return a promise that will eventually resolve or reject when the
// message is ack'd or nack'd
return await new Promise<Response>((resolve, reject) => {
let data = {};

try {
data = JSON.parse(
Buffer.from(json.message.data, 'base64').toString(),
);
} catch (error) {
logger.error(
`Failed to parse message data [PubSub ID: ${pubSubId}]: ${error}`,
);

return reject(new Response(null, { status: 500 }));
}

const event = new MqMessageReceivedEvent({
id: json.message.message_id,
subscriptionIdentifier: json.subscription,
data,
attributes: json.message.attributes,
onAck: () => resolve(new Response(null, { status: 200 })),
onNack: () => reject(new Response(null, { status: 500 })),
});

eventBus.emit(EVENT_MQ_MESSAGE_RECEIVED, event);

// TODO: Handle timeout
});
}
1 change: 1 addition & 0 deletions src/api/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export { getActivitiesAction } from './action/getActivities';
export { getActivityThreadAction } from './action/getActivityThread';
export { handleMessageAction } from './action/mq/handleMessage';
export { profileGetAction } from './action/profile/get';
export { profileGetFollowersAction } from './action/profile/getFollowers';
export { profileGetFollowingAction } from './action/profile/getFollowing';
Expand Down
26 changes: 22 additions & 4 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import './instrumentation';

import { AsyncLocalStorage } from 'node:async_hooks';
import { createHmac } from 'node:crypto';
import { EventEmitter } from 'node:events';
import {
Accept,
Announce,
Expand Down Expand Up @@ -39,6 +40,7 @@ import { behindProxy } from 'x-forwarded-fetch';
import {
getActivitiesAction,
getActivityThreadAction,
handleMessageAction,
profileGetAction,
profileGetFollowersAction,
profileGetFollowingAction,
Expand Down Expand Up @@ -79,6 +81,7 @@ import {
import { KnexKvStore } from './knex.kvstore';
import { scopeKvStore } from './kv-helpers';

import { EVENT_MQ_MESSAGE_RECEIVED } from './constants';
import {
followAction,
inboxHandler,
Expand Down Expand Up @@ -154,13 +157,18 @@ export type ContextData = {

const fedifyKv = await KnexKvStore.create(client, 'key_value');

let messageQueue: MessageQueue | undefined;
const eventBus = new EventEmitter();

let queue: MessageQueue | undefined;

if (process.env.USE_MQ === 'true') {
logging.info('Message queue is enabled');

try {
messageQueue = await initGCloudPubSubMessageQueue(
queue = await initGCloudPubSubMessageQueue(
logging,
eventBus,
EVENT_MQ_MESSAGE_RECEIVED,
{
host: String(process.env.MQ_PUBSUB_HOST),
emulatorMode: process.env.NODE_ENV !== 'production',
Expand All @@ -170,7 +178,6 @@ if (process.env.USE_MQ === 'true') {
process.env.MQ_PUBSUB_SUBSCRIPTION_NAME,
),
},
logging,
);
} catch (err) {
logging.error('Failed to initialise message queue {error}', {
Expand All @@ -185,7 +192,7 @@ if (process.env.USE_MQ === 'true') {

export const fedify = createFederation<ContextData>({
kv: fedifyKv,
queue: messageQueue,
queue,
skipSignatureVerification:
process.env.SKIP_SIGNATURE_VERIFICATION === 'true' &&
['development', 'testing'].includes(process.env.NODE_ENV || ''),
Expand Down Expand Up @@ -348,6 +355,7 @@ export type HonoContextVariables = {
host: string;
webhook_secret: string;
};
eventBus: EventEmitter;
};

const app = new Hono<{ Variables: HonoContextVariables }>();
Expand All @@ -360,6 +368,12 @@ app.get('/ping', (ctx) => {

/** Middleware */

app.use(async (ctx, next) => {
ctx.set('eventBus', eventBus);

return next();
});

app.use(async (ctx, next) => {
const extra: Record<string, string> = {};

Expand Down Expand Up @@ -537,6 +551,10 @@ app.use(async (ctx, next) => {
await next();
});

// This needs to go before the middleware which loads the site
// because this endpoint does not require the site to exist
app.post('/.ghost/activitypub/mq', spanWrapper(handleMessageAction));

// This needs to go before the middleware which loads the site
// Because the site doesn't always exist - this is how it's created
app.get(
Expand Down
2 changes: 2 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ export const FOLLOWERS_PAGE_SIZE = 100;
export const FOLLOWING_PAGE_SIZE = 100;
export const LIKED_PAGE_SIZE = 100;
export const OUTBOX_PAGE_SIZE = 100;

export const EVENT_MQ_MESSAGE_RECEIVED = 'mq.message_received';
41 changes: 41 additions & 0 deletions src/events/mq-message-received-event.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
interface MqMessageReceivedEventOptions {
id: string;
subscriptionIdentifier: string;
data: Record<string, unknown>;
attributes: Record<string, string>;
onAck: () => void;
onNack: () => void;
}

export class MqMessageReceivedEvent {
readonly id: string;
readonly subscriptionIdentifier: string;
readonly data: Record<string, unknown>;
readonly attributes: Record<string, string>;
private onAck: () => void;
private onNack: () => void;

constructor({
id,
subscriptionIdentifier,
data,
attributes,
onAck,
onNack,
}: MqMessageReceivedEventOptions) {
this.id = id;
this.subscriptionIdentifier = subscriptionIdentifier;
this.data = data;
this.attributes = attributes;
this.onAck = onAck;
this.onNack = onNack;
}

ack() {
this.onAck();
}

nack() {
this.onNack();
}
}
127 changes: 0 additions & 127 deletions src/fedify/mq/gcloud-pubsub-mq.ts

This file was deleted.

Loading

0 comments on commit 011985f

Please sign in to comment.