diff --git a/docker-compose.yml b/docker-compose.yml index de5d5622..0e5c78de 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,6 +18,7 @@ services: - MQ_PUBSUB_HOST=pubsub:8085 - MQ_PUBSUB_TOPIC_NAME=activitypub_topic_changeme - MQ_PUBSUB_SUBSCRIPTION_NAME=activitypub_subscription_changeme + - ACTIVITYPUB_COLLECTION_PAGE_SIZE=20 command: yarn build:watch depends_on: migrate: @@ -115,6 +116,7 @@ services: - MQ_PUBSUB_HOST=pubsub-testing:8085 - MQ_PUBSUB_TOPIC_NAME=activitypub_topic_changeme - MQ_PUBSUB_SUBSCRIPTION_NAME=activitypub_subscription_changeme + - ACTIVITYPUB_COLLECTION_PAGE_SIZE=2 command: yarn build:watch depends_on: mysql-testing: @@ -166,6 +168,8 @@ services: - MYSQL_USER=ghost - MYSQL_PASSWORD=password - MYSQL_DATABASE=activitypub + ports: + - "3308:3306" healthcheck: test: "mysql -ughost -ppassword activitypub -e 'select 1'" interval: 1s diff --git a/features/followers.feature b/features/followers.feature new file mode 100644 index 00000000..0cffcc90 --- /dev/null +++ b/features/followers.feature @@ -0,0 +1,15 @@ +Feature: Followers + + Scenario: Activities are sent to all followers + Given we are followed by: + | name | type | + | Alice | Person | + | Bob | Person | + | Charlie | Person | + | Dave | Person | + And the list of followers is paginated across multiple pages + When we create a note "Note" with the content + """ + Hello, world! + """ + Then Activity "Note" is sent to all followers diff --git a/features/step_definitions/stepdefs.js b/features/step_definitions/stepdefs.js index 67e48bc2..1bfee14c 100644 --- a/features/step_definitions/stepdefs.js +++ b/features/step_definitions/stepdefs.js @@ -524,6 +524,51 @@ Given('we follow {string}', async function (name) { } }); +Given('we are followed by:', async function (actors) { + for (const { name, type } of actors.hashes()) { + // Create the actor + this.actors[name] = await createActor(name, { type }); + + // Create the follow activity + const actor = this.actors[name]; + const object = this.actors.Us; + const activity = await createActivity('Follow', object, actor); + + const key = `Follow(Us)_${name}`; + this.activities[key] = activity; + this.objects[key] = object; + + // Send the follow activity to the inbox + this.response = await fetchActivityPub( + 'http://fake-ghost-activitypub/.ghost/activitypub/inbox/index', + { + method: 'POST', + body: JSON.stringify(activity), + }, + ); + + await waitForInboxActivity(activity); + } +}); + +Given('the list of followers is paginated across multiple pages', async () => { + const followersResponse = await fetchActivityPub( + 'http://fake-ghost-activitypub/.ghost/activitypub/followers/index', + ); + const followersResponseJson = await followersResponse.json(); + + const followersFirstPageReponse = await fetchActivityPub( + followersResponseJson.first, + ); + const followersFirstPageReponseJson = + await followersFirstPageReponse.json(); + + assert( + followersFirstPageReponseJson.next, + 'Expected multiple pages of pagination but only got 1', + ); +}); + When('we like the object {string}', async function (name) { const id = this.objects[name].id; this.response = await fetchActivityPub( @@ -868,6 +913,63 @@ Then( }, ); +Then( + 'Activity {string} is sent to all followers', + async function (activityName) { + // Retrieve all followers + const followers = []; + + const followersResponse = await fetchActivityPub( + 'http://fake-ghost-activitypub/.ghost/activitypub/followers/index', + ); + const followersResponseJson = await followersResponse.json(); + + const followersFirstPageResponse = await fetchActivityPub( + followersResponseJson.first, + ); + const followersFirstPageResponseJson = + await followersFirstPageResponse.json(); + + followers.push(...followersFirstPageResponseJson.orderedItems); + + let nextPage = followersFirstPageResponseJson.next; + + while (nextPage) { + const nextPageResponse = await fetchActivityPub(nextPage); + const nextPageResponseJson = await nextPageResponse.json(); + + followers.push(...nextPageResponseJson.orderedItems); + + nextPage = nextPageResponseJson.next; + } + + // Check that the activity was sent to all followers + const activity = this.activities[activityName]; + + for (const follower of followers) { + const inbox = new URL(follower.inbox); + + const found = await waitForRequest( + 'POST', + inbox.pathname, + (call) => { + const json = JSON.parse(call.request.body); + + return ( + json.type === activity.type && + json.object.id === activity.object.id + ); + }, + ); + + assert( + found, + `Activity "${activityName}" was not sent to "${follower.name}"`, + ); + } + }, +); + const webhooks = { 'post.published': { post: { diff --git a/src/constants.ts b/src/constants.ts index bee62924..de19c3e5 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -2,8 +2,3 @@ export const ACTOR_DEFAULT_HANDLE = 'index'; export const ACTOR_DEFAULT_NAME = 'Local Ghost site'; export const ACTOR_DEFAULT_ICON = 'https://ghost.org/favicon.ico'; export const ACTOR_DEFAULT_SUMMARY = 'This is a summary'; - -export const FOLLOWERS_PAGE_SIZE = 20; -export const FOLLOWING_PAGE_SIZE = 20; -export const LIKED_PAGE_SIZE = 20; -export const OUTBOX_PAGE_SIZE = 20; diff --git a/src/dispatchers.ts b/src/dispatchers.ts index dc333369..6eecbea5 100644 --- a/src/dispatchers.ts +++ b/src/dispatchers.ts @@ -23,13 +23,7 @@ import { import * as Sentry from '@sentry/node'; import { v4 as uuidv4 } from 'uuid'; import { type ContextData, fedify } from './app'; -import { - ACTOR_DEFAULT_HANDLE, - FOLLOWERS_PAGE_SIZE, - FOLLOWING_PAGE_SIZE, - LIKED_PAGE_SIZE, - OUTBOX_PAGE_SIZE, -} from './constants'; +import { ACTOR_DEFAULT_HANDLE } from './constants'; import { isFollowing } from './helpers/activitypub/actor'; import { getUserData, getUserKeypair } from './helpers/user'; import { addToList } from './kv-helpers'; @@ -470,6 +464,20 @@ export async function followersDispatcher( ) { ctx.data.logger.info('Followers Dispatcher'); + if (cursor === null) { + ctx.data.logger.info('No cursor provided, returning early'); + + return null; + } + + const pageSize = Number.parseInt( + process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE || '', + ); + + if (Number.isNaN(pageSize)) { + throw new Error(`Page size: ${pageSize} is not valid`); + } + const offset = Number.parseInt(cursor ?? '0'); let nextCursor: string | null = null; @@ -484,11 +492,11 @@ export async function followersDispatcher( if (fullResults) { nextCursor = - fullResults.length > offset + FOLLOWERS_PAGE_SIZE - ? (offset + FOLLOWERS_PAGE_SIZE).toString() + fullResults.length > offset + pageSize + ? (offset + pageSize).toString() : null; - items = fullResults.slice(offset, offset + FOLLOWERS_PAGE_SIZE); + items = fullResults.slice(offset, offset + pageSize); } else { const results = [ // Remove duplicates @@ -496,14 +504,11 @@ export async function followersDispatcher( ]; nextCursor = - results.length > offset + FOLLOWERS_PAGE_SIZE - ? (offset + FOLLOWERS_PAGE_SIZE).toString() + results.length > offset + pageSize + ? (offset + pageSize).toString() : null; - const slicedResults = results.slice( - offset, - offset + FOLLOWERS_PAGE_SIZE, - ); + const slicedResults = results.slice(offset, offset + pageSize); const actors = ( await Promise.all( @@ -511,8 +516,8 @@ export async function followersDispatcher( ) ) // This could potentially mean that the slicedResults is not the size - // of FOLLOWERS_PAGE_SIZE if for some reason the lookupActor returns - // null for some of the results. TODO: Find a better way to handle this + // of pageSize if for some reason the lookupActor returns null for + // some of the results. TODO: Find a better way to handle this .filter((item): item is Actor => isActor(item)); const toStore = await Promise.all( @@ -554,17 +559,25 @@ export async function followingDispatcher( ) { ctx.data.logger.info('Following Dispatcher'); + const pageSize = Number.parseInt( + process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE || '', + ); + + if (Number.isNaN(pageSize)) { + throw new Error(`Page size: ${pageSize} is not valid`); + } + const offset = Number.parseInt(cursor ?? '0'); let nextCursor: string | null = null; const results = (await ctx.data.db.get(['following'])) || []; nextCursor = - results.length > offset + FOLLOWING_PAGE_SIZE - ? (offset + FOLLOWING_PAGE_SIZE).toString() + results.length > offset + pageSize + ? (offset + pageSize).toString() : null; - const slicedResults = results.slice(offset, offset + FOLLOWING_PAGE_SIZE); + const slicedResults = results.slice(offset, offset + pageSize); ctx.data.logger.info('Following results', { results: slicedResults }); @@ -611,6 +624,14 @@ export async function outboxDispatcher( ) { ctx.data.logger.info('Outbox Dispatcher'); + const pageSize = Number.parseInt( + process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE || '', + ); + + if (Number.isNaN(pageSize)) { + throw new Error(`Page size: ${pageSize} is not valid`); + } + const offset = Number.parseInt(cursor ?? '0'); let nextCursor: string | null = null; @@ -619,11 +640,11 @@ export async function outboxDispatcher( ).reverse(); nextCursor = - results.length > offset + OUTBOX_PAGE_SIZE - ? (offset + OUTBOX_PAGE_SIZE).toString() + results.length > offset + pageSize + ? (offset + pageSize).toString() : null; - const slicedResults = results.slice(offset, offset + OUTBOX_PAGE_SIZE); + const slicedResults = results.slice(offset, offset + pageSize); ctx.data.logger.info('Outbox results', { results: slicedResults }); @@ -679,17 +700,25 @@ export async function likedDispatcher( logger, }); + const pageSize = Number.parseInt( + process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE || '', + ); + + if (Number.isNaN(pageSize)) { + throw new Error(`Page size: ${pageSize} is not valid`); + } + const offset = Number.parseInt(cursor ?? '0'); let nextCursor: string | null = null; const results = ((await db.get(['liked'])) || []).reverse(); nextCursor = - results.length > offset + LIKED_PAGE_SIZE - ? (offset + LIKED_PAGE_SIZE).toString() + results.length > offset + pageSize + ? (offset + pageSize).toString() : null; - const slicedResults = results.slice(offset, offset + LIKED_PAGE_SIZE); + const slicedResults = results.slice(offset, offset + pageSize); ctx.data.logger.info('Liked results', { results: slicedResults }); diff --git a/src/dispatchers.unit.test.ts b/src/dispatchers.unit.test.ts index 5ad165b6..62f1c161 100644 --- a/src/dispatchers.unit.test.ts +++ b/src/dispatchers.unit.test.ts @@ -81,6 +81,10 @@ describe('dispatchers', () => { ctx.data.globaldb.get.mockImplementation((key: string[]) => { return Promise.resolve(following[key[0]]); }); + + if (!process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE) { + process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE = '2'; + } }); it('returns items from the following collection in the correct order', async () => { @@ -181,6 +185,10 @@ describe('dispatchers', () => { ctx.data.globaldb.get.mockImplementation((key: string[]) => { return Promise.resolve(likeActivities[key[0]]); }); + + if (!process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE) { + process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE = '2'; + } }); it('returns items from the liked collection in the correct order', async () => { @@ -358,6 +366,10 @@ describe('dispatchers', () => { ctx.data.globaldb.get.mockImplementation((key: string[]) => { return Promise.resolve(outboxActivities[key[0]]); }); + + if (!process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE) { + process.env.ACTIVITYPUB_COLLECTION_PAGE_SIZE = '2'; + } }); it('returns items from the outbox collection in the correct order', async () => {