Skip to content

Commit

Permalink
Added GCloud Pub/Sub message queue implementation (#143)
Browse files Browse the repository at this point in the history
refs [AP-566](https://linear.app/ghost/issue/AP-566/implement-a-pubsub-backed-queue-for-fedify)

Added and configured a GCloud Pub/Sub message queue implementation for Fedify
  • Loading branch information
mike182uk authored Nov 14, 2024
1 parent 06e0d8e commit 40aca74
Show file tree
Hide file tree
Showing 17 changed files with 960 additions and 16 deletions.
28 changes: 26 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ services:
- NODE_ENV=development
- ALLOW_PRIVATE_ADDRESS=true
- SKIP_SIGNATURE_VERIFICATION=true
- USE_MQ=true
- MQ_PUBSUB_PROJECT_ID=activitypub
- MQ_PUBSUB_HOST=pubsub:8085
- MQ_PUBSUB_TOPIC_NAME=activitypub_topic_changeme
- MQ_PUBSUB_SUBSCRIPTION_NAME=activitypub_subscription_changeme
command: yarn build:watch
depends_on:
migrate:
Expand Down Expand Up @@ -74,7 +79,14 @@ services:
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:499.0.0-emulators
ports:
- "8085:8085"
command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=activitypub
command: /bin/bash -c "/opt/activitypub/start-pubsub.sh"
volumes:
- ./pubsub/start.sh:/opt/activitypub/start-pubsub.sh
environment:
- PROJECT_ID=activitypub
- TOPIC_NAME=activitypub_topic_changeme
- SUBSCRIPTION_NAME=activitypub_subscription_changeme
- PUSH_ENDPOINT=http://activitypub:8080/.ghost/activitypub/mq
healthcheck:
test: "curl -f http://localhost:8085 || exit 1"
interval: 1s
Expand All @@ -98,6 +110,11 @@ services:
- SKIP_SIGNATURE_VERIFICATION=true
- ALLOW_PRIVATE_ADDRESS=true
- NODE_TLS_REJECT_UNAUTHORIZED=0
- USE_MQ=true
- MQ_PUBSUB_PROJECT_ID=activitypub
- MQ_PUBSUB_HOST=pubsub-testing:8085
- MQ_PUBSUB_TOPIC_NAME=activitypub_topic_changeme
- MQ_PUBSUB_SUBSCRIPTION_NAME=activitypub_subscription_changeme
command: yarn build:watch
depends_on:
mysql-testing:
Expand Down Expand Up @@ -157,7 +174,14 @@ services:
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:499.0.0-emulators
ports:
- "8086:8085"
command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=activitypub
command: /bin/bash -c "/opt/activitypub/start-pubsub.sh"
volumes:
- ./pubsub/start.sh:/opt/activitypub/start-pubsub.sh
environment:
- PROJECT_ID=activitypub
- TOPIC_NAME=activitypub_topic_changeme
- SUBSCRIPTION_NAME=activitypub_subscription_changeme
- PUSH_ENDPOINT=http://activitypub-testing:8083/.ghost/activitypub/mq
healthcheck:
test: "curl -f http://localhost:8085 || exit 1"
interval: 1s
Expand Down
3 changes: 3 additions & 0 deletions features/accept-follows.feature
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Feature: We automatically accept Follow requests
Given an Actor "Alice"
Given a "Follow(Us)" Activity "F" by "Alice"
When "Alice" sends "F" to the Inbox
And "F" is in our Inbox
Then an "Accept(F)" Activity "A" is created by "Us"
And Activity "A" is sent to "Alice"
And "Alice" is in our Followers
Expand All @@ -15,7 +16,9 @@ Feature: We automatically accept Follow requests
And a "Follow(Us)" Activity "F1" by "Alice"
And a "Follow(Us)" Activity "F2" by "Alice"
When "Alice" sends "F1" to the Inbox
And "F1" is in our Inbox
And "Alice" sends "F2" to the Inbox
And "F2" is in our Inbox
Then an "Accept(F1)" Activity "A1" is created by "Us"
And an "Accept(F2)" Activity "A2" is created by "Us"
And Activity "A1" is sent to "Alice"
Expand Down
1 change: 1 addition & 0 deletions features/follow-account.feature
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Feature: Follow accounts from their handle
Then the request is accepted
Given a "Accept(Follow(Alice))" Activity "A" by "Alice"
And "Alice" sends "A" to the Inbox
And "A" is in our Inbox
Given we follow "Alice"
Then the request is rejected with a 409

Expand Down
1 change: 1 addition & 0 deletions features/handle-announce.feature
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Feature: Announce(Note)
Then the request is accepted
Given a "Accept(Follow(Alice))" Activity "Accept" by "Alice"
And "Alice" sends "Accept" to the Inbox
And "Accept" is in our Inbox
Given a "Announce(Note)" Activity "A" by "Alice"
When "Alice" sends "A" to the Inbox
Then the request is accepted
Expand Down
1 change: 1 addition & 0 deletions features/handle-create-article.feature
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Feature: Create(Article)
Then the request is accepted
Given a "Accept(Follow(Alice))" Activity "Accept" by "Alice"
And "Alice" sends "Accept" to the Inbox
And "Accept" is in our Inbox
Given a "Create(Article)" Activity "A" by "Alice"
When "Alice" sends "A" to the Inbox
Then the request is accepted
Expand Down
8 changes: 8 additions & 0 deletions features/like-activity.feature
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ Feature: Liking an object
Then the request is accepted
Given a "Accept(Follow(Alice))" Activity "Accept" by "Alice"
And "Alice" sends "Accept" to the Inbox
And "Accept" is in our Inbox
Given a "Create(Note)" Activity "Note" by "Alice"
When "Alice" sends "Note" to the Inbox
And "Note" is in our Inbox
And we like the object "Note"
Then the request is accepted
And the object "Note" should be liked
Expand All @@ -23,8 +25,10 @@ Feature: Liking an object
Then the request is accepted
Given a "Accept(Follow(Alice))" Activity "Accept" by "Alice"
And "Alice" sends "Accept" to the Inbox
And "Accept" is in our Inbox
Given a "Create(Note)" Activity "Note" by "Alice"
When "Alice" sends "Note" to the Inbox
And "Note" is in our Inbox
And we like the object "Note"
Then the request is accepted
Then we like the object "Note"
Expand All @@ -36,8 +40,10 @@ Feature: Liking an object
Then the request is accepted
Given a "Accept(Follow(Alice))" Activity "Accept" by "Alice"
And "Alice" sends "Accept" to the Inbox
And "Accept" is in our Inbox
Given a "Create(Note)" Activity "Note" by "Alice"
When "Alice" sends "Note" to the Inbox
And "Note" is in our Inbox
Then we unlike the object "Note"
Then the request is rejected with a 409

Expand All @@ -47,8 +53,10 @@ Feature: Liking an object
Then the request is accepted
Given a "Accept(Follow(Alice))" Activity "Accept" by "Alice"
And "Alice" sends "Accept" to the Inbox
And "Accept" is in our Inbox
Given a "Create(Note)" Activity "Note" by "Alice"
When "Alice" sends "Note" to the Inbox
And "Note" is in our Inbox
And we like the object "Note"
Then the request is accepted
Then we unlike the object "Note"
Expand Down
54 changes: 41 additions & 13 deletions features/step_definitions/stepdefs.js
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ Before(async function () {
}
});

async function fetchActivityPub(url, options) {
async function fetchActivityPub(url, options = {}) {
if (!options.headers) {
options.headers = {};
}
Expand Down Expand Up @@ -702,6 +702,45 @@ async function waitForRequest(
return waitForRequest(method, path, matcher, step, milliseconds - step);
}

async function waitForInboxActivity(
activity,
options = {
retryCount: 0,
delay: 0,
},
) {
const MAX_RETRIES = 5;

const response = await fetchActivityPub(
'http://fake-ghost-activitypub/.ghost/activitypub/inbox/index',
{
headers: {
Accept: 'application/ld+json',
},
},
);
const inbox = await response.json();

if (inbox.items.find((item) => item.id === activity.id)) {
return;
}

if (options.retryCount === MAX_RETRIES) {
throw new Error(
`Max retries reached (${MAX_RETRIES}) when waiting on an activity in the inbox`,
);
}

if (options.delay > 0) {
await new Promise((resolve) => setTimeout(resolve, options.delay));
}

await waitForInboxActivity(activity, {
retryCount: options.retryCount + 1,
delay: options.delay + 500,
});
}

Then(
'Activity {string} is sent to {string}',
async function (activityName, actorName) {
Expand Down Expand Up @@ -869,20 +908,9 @@ Then('the found {string} has property {string}', function (name, prop) {
});

Then('{string} is in our Inbox', async function (activityName) {
const response = await fetchActivityPub(
'http://fake-ghost-activitypub/.ghost/activitypub/inbox/index',
{
headers: {
Accept: 'application/ld+json',
},
},
);
const inbox = await response.json();
const activity = this.activities[activityName];

const found = inbox.items.find((item) => item.id === activity.id);

assert(found);
await waitForInboxActivity(activity);
});

Then('{string} is not in our Inbox', async function (activityName) {
Expand Down
49 changes: 49 additions & 0 deletions pubsub/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/bin/bash

# This script is used to start the Pub/Sub emulator and create the required
# topic and subscription upfront (defined in the environment variables)
#
# See:
# https://cloud.google.com/pubsub/docs/emulator
# https://cloud.google.com/pubsub/docs/create-topic#pubsub_create_topic-rest
# https://cloud.google.com/pubsub/docs/create-push-subscription#pubsub_create_push_subscription-rest

# Ensure we explicitly set the host to 0.0.0.0:8085 so that the emulator will
# listen on all ip addresses and not just IPv6 (which is the default)
HOST=0.0.0.0:8085

# Start the emulator
gcloud beta emulators pubsub start --host-port=${HOST} --project=${PROJECT_ID} &

# Wait for the emulator to be ready
until curl -f http://${HOST}; do
echo "Waiting for Pub/Sub emulator to start..."

sleep 1
done

# Create the topic via REST API
if curl -s -o /dev/null -w "%{http_code}" -X PUT http://${HOST}/v1/projects/${PROJECT_ID}/topics/${TOPIC_NAME} | grep -q "200"; then
echo "Topic created: ${TOPIC_NAME}"
else
echo "Failed to create topic: ${TOPIC_NAME}"
exit 1
fi

# Create the (push) subscription via REST API
if curl -s -o /dev/null -w "%{http_code}" -X PUT http://${HOST}/v1/projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_NAME} \
-H "Content-Type: application/json" \
-d '{
"topic": "projects/'${PROJECT_ID}'/topics/'${TOPIC_NAME}'",
"pushConfig": {
"pushEndpoint": "'${PUSH_ENDPOINT}'"
}
}' | grep -q "200"; then
echo "Subscription created: ${SUBSCRIPTION_NAME}"
else
echo "Failed to create subscription: ${SUBSCRIPTION_NAME}"
exit 1
fi

# Keep the container running
tail -f /dev/null
54 changes: 54 additions & 0 deletions src/api/action/mq/handleMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import type { Context } from 'hono';

import type { HonoContextVariables } from '../../../app';
import { EVENT_MQ_MESSAGE_RECEIVED } from '../../../constants';
import { MqMessageReceivedEvent } from '../../../events/mq-message-received-event';

export async function handleMessageAction(
ctx: Context<{ Variables: HonoContextVariables }>,
): Promise<Response> {
const logger = ctx.get('logger');
const eventBus = ctx.get('eventBus');

const json = await ctx.req.json();
const pubSubId = json?.message?.message_id ?? 'unknown';

// If no listeners are attached, we should not process the message
if (eventBus.listenerCount(EVENT_MQ_MESSAGE_RECEIVED) === 0) {
logger.info(
`No event listeners attached to [${EVENT_MQ_MESSAGE_RECEIVED}], nacking incoming message [PubSub ID: ${pubSubId}]`,
);

return new Response(null, { status: 429 });
}

// Return a promise that will eventually resolve when a message is ack'd or nack'd
return await new Promise<Response>((resolve) => {
let data = {};

try {
data = JSON.parse(
Buffer.from(json.message.data, 'base64').toString(),
);
} catch (error) {
logger.error(
`Failed to parse message data [PubSub ID: ${pubSubId}]: ${error}`,
);

return resolve(new Response(null, { status: 500 }));
}

const event = new MqMessageReceivedEvent({
id: json.message.message_id,
subscriptionIdentifier: json.subscription,
data,
attributes: json.message.attributes,
onAck: () => resolve(new Response(null, { status: 200 })),
onNack: () => resolve(new Response(null, { status: 500 })),
});

eventBus.emit(EVENT_MQ_MESSAGE_RECEIVED, event);

// TODO: Handle timeout
});
}
1 change: 1 addition & 0 deletions src/api/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export { getActivitiesAction } from './action/getActivities';
export { getActivityThreadAction } from './action/getActivityThread';
export { handleMessageAction } from './action/mq/handleMessage';
export { profileGetAction } from './action/profile/get';
export { profileGetFollowersAction } from './action/profile/getFollowers';
export { profileGetFollowingAction } from './action/profile/getFollowing';
Expand Down
Loading

0 comments on commit 40aca74

Please sign in to comment.