Skip to content

Commit

Permalink
Ensured activities are delivered to all followers
Browse files Browse the repository at this point in the history
refs [AP-638](https://linear.app/ghost/issue/AP-638/ghost-is-not-delivering-activities-to-all-followers)

Added logic to ensure that activities are delivered to all followers and not
just the first page of followers
  • Loading branch information
mike182uk committed Dec 17, 2024
1 parent 9b316a0 commit d1b1c6a
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 32 deletions.
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ services:
- MQ_PUBSUB_HOST=pubsub:8085
- MQ_PUBSUB_TOPIC_NAME=activitypub_topic_changeme
- MQ_PUBSUB_SUBSCRIPTION_NAME=activitypub_subscription_changeme
- ACTIVITYPUB_COLLECTION_PAGE_SIZE=20
command: yarn build:watch
depends_on:
migrate:
Expand Down Expand Up @@ -115,6 +116,7 @@ services:
- MQ_PUBSUB_HOST=pubsub-testing:8085
- MQ_PUBSUB_TOPIC_NAME=activitypub_topic_changeme
- MQ_PUBSUB_SUBSCRIPTION_NAME=activitypub_subscription_changeme
- ACTIVITYPUB_COLLECTION_PAGE_SIZE=2
command: yarn build:watch
depends_on:
mysql-testing:
Expand Down Expand Up @@ -166,6 +168,8 @@ services:
- MYSQL_USER=ghost
- MYSQL_PASSWORD=password
- MYSQL_DATABASE=activitypub
ports:
- "3308:3306"
healthcheck:
test: "mysql -ughost -ppassword activitypub -e 'select 1'"
interval: 1s
Expand Down
15 changes: 15 additions & 0 deletions features/followers.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Feature: Followers

Scenario: Activities are sent to all followers
Given we are followed by:
| name | type |
| Alice | Person |
| Bob | Person |
| Charlie | Person |
| Dave | Person |
And the list of followers is paginated across multiple pages
When we create a note "Note" with the content
"""
Hello, world!
"""
Then Activity "Note" is sent to all followers
101 changes: 101 additions & 0 deletions features/step_definitions/stepdefs.js
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,50 @@ Given('we follow {string}', async function (name) {
}
});

Given('we are followed by:', async function (actors) {
for (const { name, type } of actors.hashes()) {
// Create the actor
this.actors[name] = await createActor(name, { type });

// Create the follow activity
const actor = this.actors[name];
const object = this.actors.Us;
const activity = await createActivity('Follow', object, actor);

this.activities[name] = activity;
this.objects[name] = object;

// Send the follow activity to the inbox
this.response = await fetchActivityPub(
'http://fake-ghost-activitypub/.ghost/activitypub/inbox/index',
{
method: 'POST',
body: JSON.stringify(activity),
},
);

await waitForInboxActivity(activity);
}
});

Given('the list of followers is paginated across multiple pages', async () => {
const followersResponse = await fetchActivityPub(
'http://fake-ghost-activitypub/.ghost/activitypub/followers/index',
);
const followersResponseJson = await followersResponse.json();

const followersFirstPageReponse = await fetchActivityPub(
followersResponseJson.first,
);
const followersFirstPageReponseJson =
await followersFirstPageReponse.json();

assert(
followersFirstPageReponseJson.next,
'Expected multiple pages of pagination but only got 1',
);
});

When('we like the object {string}', async function (name) {
const id = this.objects[name].id;
this.response = await fetchActivityPub(
Expand Down Expand Up @@ -868,6 +912,63 @@ Then(
},
);

Then(
'Activity {string} is sent to all followers',
async function (activityName) {
// Retrieve all followers
const followers = [];

const followersResponse = await fetchActivityPub(
'http://fake-ghost-activitypub/.ghost/activitypub/followers/index',
);
const followersResponseJson = await followersResponse.json();

const followersFirstPageResponse = await fetchActivityPub(
followersResponseJson.first,
);
const followersFirstPageResponseJson =
await followersFirstPageResponse.json();

followers.push(...followersFirstPageResponseJson.orderedItems);

let nextPage = followersFirstPageResponseJson.next;

while (nextPage) {
const nextPageResponse = await fetchActivityPub(nextPage);
const nextPageResponseJson = await nextPageResponse.json();

followers.push(...nextPageResponseJson.orderedItems);

nextPage = nextPageResponseJson.next;
}

// Check that the activity was sent to all followers
const activity = this.activities[activityName];

for (const follower of followers) {
const inbox = new URL(follower.inbox);

const found = await waitForRequest(
'POST',
inbox.pathname,
(call) => {
const json = JSON.parse(call.request.body);

return (
json.type === activity.type &&
json.object.id === activity.object.id
);
},
);

assert(
found,
`Activity "${activityName}" was not sent to "${follower.name}"`,
);
}
},
);

const webhooks = {
'post.published': {
post: {
Expand Down
5 changes: 0 additions & 5 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,3 @@ export const ACTOR_DEFAULT_HANDLE = 'index';
export const ACTOR_DEFAULT_NAME = 'Local Ghost site';
export const ACTOR_DEFAULT_ICON = 'https://ghost.org/favicon.ico';
export const ACTOR_DEFAULT_SUMMARY = 'This is a summary';

export const FOLLOWERS_PAGE_SIZE = 20;
export const FOLLOWING_PAGE_SIZE = 20;
export const LIKED_PAGE_SIZE = 20;
export const OUTBOX_PAGE_SIZE = 20;
83 changes: 56 additions & 27 deletions src/dispatchers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,7 @@ import {
import * as Sentry from '@sentry/node';
import { v4 as uuidv4 } from 'uuid';
import { type ContextData, fedify } from './app';
import {
ACTOR_DEFAULT_HANDLE,
FOLLOWERS_PAGE_SIZE,
FOLLOWING_PAGE_SIZE,
LIKED_PAGE_SIZE,
OUTBOX_PAGE_SIZE,
} from './constants';
import { ACTOR_DEFAULT_HANDLE } from './constants';
import { isFollowing } from './helpers/activitypub/actor';
import { getUserData, getUserKeypair } from './helpers/user';
import { addToList } from './kv-helpers';
Expand Down Expand Up @@ -470,6 +464,20 @@ export async function followersDispatcher(
) {
ctx.data.logger.info('Followers Dispatcher');

if (cursor === null) {
ctx.data.logger.info('No cursor provided, returning early');

return null;
}

const pageSize = Number.parseInt(
process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE || '',
);

if (Number.isNaN(pageSize)) {
throw new Error(`Page size: ${pageSize} is not valid`);
}

const offset = Number.parseInt(cursor ?? '0');
let nextCursor: string | null = null;

Expand All @@ -484,35 +492,32 @@ export async function followersDispatcher(

if (fullResults) {
nextCursor =
fullResults.length > offset + FOLLOWERS_PAGE_SIZE
? (offset + FOLLOWERS_PAGE_SIZE).toString()
fullResults.length > offset + pageSize
? (offset + pageSize).toString()
: null;

items = fullResults.slice(offset, offset + FOLLOWERS_PAGE_SIZE);
items = fullResults.slice(offset, offset + pageSize);
} else {
const results = [
// Remove duplicates
...new Set((await ctx.data.db.get<string[]>(['followers'])) || []),
];

nextCursor =
results.length > offset + FOLLOWERS_PAGE_SIZE
? (offset + FOLLOWERS_PAGE_SIZE).toString()
results.length > offset + pageSize
? (offset + pageSize).toString()
: null;

const slicedResults = results.slice(
offset,
offset + FOLLOWERS_PAGE_SIZE,
);
const slicedResults = results.slice(offset, offset + pageSize);

const actors = (
await Promise.all(
slicedResults.map((result) => lookupActor(ctx, result)),
)
)
// This could potentially mean that the slicedResults is not the size
// of FOLLOWERS_PAGE_SIZE if for some reason the lookupActor returns
// null for some of the results. TODO: Find a better way to handle this
// of pageSize if for some reason the lookupActor returns null for
// some of the results. TODO: Find a better way to handle this
.filter((item): item is Actor => isActor(item));

const toStore = await Promise.all(
Expand Down Expand Up @@ -554,17 +559,25 @@ export async function followingDispatcher(
) {
ctx.data.logger.info('Following Dispatcher');

const pageSize = Number.parseInt(
process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE || '',
);

if (Number.isNaN(pageSize)) {
throw new Error(`Page size: ${pageSize} is not valid`);
}

const offset = Number.parseInt(cursor ?? '0');
let nextCursor: string | null = null;

const results = (await ctx.data.db.get<string[]>(['following'])) || [];

nextCursor =
results.length > offset + FOLLOWING_PAGE_SIZE
? (offset + FOLLOWING_PAGE_SIZE).toString()
results.length > offset + pageSize
? (offset + pageSize).toString()
: null;

const slicedResults = results.slice(offset, offset + FOLLOWING_PAGE_SIZE);
const slicedResults = results.slice(offset, offset + pageSize);

ctx.data.logger.info('Following results', { results: slicedResults });

Expand Down Expand Up @@ -611,6 +624,14 @@ export async function outboxDispatcher(
) {
ctx.data.logger.info('Outbox Dispatcher');

const pageSize = Number.parseInt(
process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE || '',
);

if (Number.isNaN(pageSize)) {
throw new Error(`Page size: ${pageSize} is not valid`);
}

const offset = Number.parseInt(cursor ?? '0');
let nextCursor: string | null = null;

Expand All @@ -619,11 +640,11 @@ export async function outboxDispatcher(
).reverse();

nextCursor =
results.length > offset + OUTBOX_PAGE_SIZE
? (offset + OUTBOX_PAGE_SIZE).toString()
results.length > offset + pageSize
? (offset + pageSize).toString()
: null;

const slicedResults = results.slice(offset, offset + OUTBOX_PAGE_SIZE);
const slicedResults = results.slice(offset, offset + pageSize);

ctx.data.logger.info('Outbox results', { results: slicedResults });

Expand Down Expand Up @@ -679,17 +700,25 @@ export async function likedDispatcher(
logger,
});

const pageSize = Number.parseInt(
process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE || '',
);

if (Number.isNaN(pageSize)) {
throw new Error(`Page size: ${pageSize} is not valid`);
}

const offset = Number.parseInt(cursor ?? '0');
let nextCursor: string | null = null;

const results = ((await db.get<string[]>(['liked'])) || []).reverse();

nextCursor =
results.length > offset + LIKED_PAGE_SIZE
? (offset + LIKED_PAGE_SIZE).toString()
results.length > offset + pageSize
? (offset + pageSize).toString()
: null;

const slicedResults = results.slice(offset, offset + LIKED_PAGE_SIZE);
const slicedResults = results.slice(offset, offset + pageSize);

ctx.data.logger.info('Liked results', { results: slicedResults });

Expand Down
12 changes: 12 additions & 0 deletions src/dispatchers.unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ describe('dispatchers', () => {
ctx.data.globaldb.get.mockImplementation((key: string[]) => {
return Promise.resolve(following[key[0]]);
});

if (!process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE) {
process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE = '2';
}
});

it('returns items from the following collection in the correct order', async () => {
Expand Down Expand Up @@ -181,6 +185,10 @@ describe('dispatchers', () => {
ctx.data.globaldb.get.mockImplementation((key: string[]) => {
return Promise.resolve(likeActivities[key[0]]);
});

if (!process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE) {
process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE = '2';
}
});

it('returns items from the liked collection in the correct order', async () => {
Expand Down Expand Up @@ -358,6 +366,10 @@ describe('dispatchers', () => {
ctx.data.globaldb.get.mockImplementation((key: string[]) => {
return Promise.resolve(outboxActivities[key[0]]);
});

if (!process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE) {
process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE = '2';
}
});

it('returns items from the outbox collection in the correct order', async () => {
Expand Down

0 comments on commit d1b1c6a

Please sign in to comment.