Skip to content

Commit

Permalink
Merge branch 'main' into mike-ap-566-implement-a-pubsub-backed-queue-…
Browse files Browse the repository at this point in the history
…for-fedify
  • Loading branch information
mike182uk committed Nov 12, 2024
2 parents 5935d4e + f3429e4 commit 1c59413
Show file tree
Hide file tree
Showing 9 changed files with 736 additions and 125 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ services:
- MYSQL_HOST=mysql
- MYSQL_PORT=3306
- MYSQL_DATABASE=activitypub
- NODE_ENV=testing
- NODE_ENV=development
- ALLOW_PRIVATE_ADDRESS=true
- SKIP_SIGNATURE_VERIFICATION=true
- USE_MQ=true
Expand Down
8 changes: 7 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"devDependencies": {
"@biomejs/biome": "1.9.4",
"@cucumber/cucumber": "10.9.0",
"@fedify/cli": "1.2.3",
"@types/jsonwebtoken": "9.0.7",
"@types/node": "20.17.6",
"@types/node-jose": "1.1.13",
Expand All @@ -42,14 +43,19 @@
"wiremock-captain": "3.5.0"
},
"dependencies": {
"@fedify/fedify": "1.2.3",
"@fedify/fedify": "1.2.4",
"@google-cloud/opentelemetry-cloud-trace-exporter": "2.4.1",
"@google-cloud/opentelemetry-cloud-trace-propagator": "0.20.0",
"@google-cloud/pubsub": "4.8.0",
"@hono/node-server": "1.13.5",
"@js-temporal/polyfill": "0.4.4",
"@logtape/logtape": "0.7.1",
"@opentelemetry/api": "1.9.0",
"@opentelemetry/auto-instrumentations-node": "0.52.1",
"@opentelemetry/exporter-trace-otlp-proto": "0.54.2",
"@opentelemetry/sdk-node": "0.54.2",
"@opentelemetry/sdk-trace-base": "1.27.0",
"@opentelemetry/sdk-trace-node": "1.27.0",
"@sentry/node": "8.37.1",
"hono": "4.6.9",
"jsonwebtoken": "9.0.2",
Expand Down
5 changes: 4 additions & 1 deletion src/api/action/getActivities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { Context } from 'hono';
import { type HonoContextVariables, fedify } from '../../app';
import { getActivityMeta } from '../../db';
import { buildActivity } from '../../helpers/activitypub/activity';
import { spanWrapper } from '../../instrumentation';

const DEFAULT_LIMIT = 10;

Expand Down Expand Up @@ -214,8 +215,10 @@ export async function getActivitiesAction(

const activities = await Promise.all(
paginatedRefs.map(async (ref) => {
const wrappedBuildActivity = spanWrapper(buildActivity);

try {
return await buildActivity(
return await wrappedBuildActivity(
ref,
globaldb,
apCtx,
Expand Down
33 changes: 24 additions & 9 deletions src/api/action/profile/get.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,31 @@ export async function profileGetAction(

result.actor = await actor.toJsonLd();
result.actor.summary = sanitizeHtml(result.actor.summary);
result.actor.attachment = await getAttachments(actor, {
sanitizeValue: (value: string) => sanitizeHtml(value),
});
result.handle = getHandle(actor);
result.followerCount = await getFollowerCount(actor);
result.followingCount = await getFollowingCount(actor);
result.isFollowing = await isFollowing(actor, { db });
result.posts = await getRecentActivities(actor, {
sanitizeContent: (content: string) => sanitizeHtml(content),
});

const [
followerCount,
followingCount,
isFollowingResult,
posts,
attachments,
] = await Promise.all([
getFollowerCount(actor),
getFollowingCount(actor),
isFollowing(actor, { db }),
getRecentActivities(actor, {
sanitizeContent: (content: string) => sanitizeHtml(content),
}),
getAttachments(actor, {
sanitizeValue: (value: string) => sanitizeHtml(value),
}),
]);

result.followerCount = followerCount;
result.followingCount = followingCount;
result.isFollowing = isFollowingResult;
result.posts = posts;
result.actor.attachment = attachments;
} catch (err) {
logger.error('Profile retrieval failed ({handle}): {error}', {
handle,
Expand Down
22 changes: 5 additions & 17 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ export const fedify = createFederation<ContextData>({
queue: messageQueue,
skipSignatureVerification:
process.env.SKIP_SIGNATURE_VERIFICATION === 'true' &&
process.env.NODE_ENV === 'testing',
['development', 'testing'].includes(process.env.NODE_ENV || ''),
allowPrivateAddress:
process.env.ALLOW_PRIVATE_ADDRESS === 'true' &&
process.env.NODE_ENV === 'testing',
['development', 'testing'].includes(process.env.NODE_ENV || ''),
});

export const db = await KnexKvStore.create(client, 'key_value');
Expand Down Expand Up @@ -380,21 +380,9 @@ app.use(async (ctx, next) => {
return event;
});

return Sentry.startSpan(
{
op: 'http.server',
name: `${ctx.req.method} ${ctx.req.path}`,
attributes: {
...extra,
'service.name': 'activitypub',
},
},
() => {
return withContext(extra, () => {
return next();
});
},
);
return withContext(extra, () => {
return next();
});
});
});

Expand Down
63 changes: 38 additions & 25 deletions src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ export const client = Knex({
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE,
},
pool: {
min: 1,
max: 25,
},
});

type ActivityMeta = {
Expand Down Expand Up @@ -125,18 +129,22 @@ export async function getActivityChildren(activity: ActivityJsonLd) {
const results = await client
.select('value')
.from('key_value')
// If inReplyTo is a string
.where(
client.raw(
`JSON_EXTRACT(value, "$.object.inReplyTo") = "${objectId}"`,
),
)
// If inReplyTo is an object
.orWhere(
client.raw(
`JSON_EXTRACT(value, "$.object.inReplyTo.id") = "${objectId}"`,
),
);
.where(function () {
// If inReplyTo is a string
this.where(
client.raw(
`JSON_EXTRACT(value, "$.object.inReplyTo") = "${objectId}"`,
),
);

// If inReplyTo is an object
this.orWhere(
client.raw(
`JSON_EXTRACT(value, "$.object.inReplyTo.id") = "${objectId}"`,
),
);
})
.andWhere(client.raw(`JSON_EXTRACT(value, "$.type") = "Create"`));

return results.map((result) => result.value);
}
Expand All @@ -147,18 +155,22 @@ export async function getActivityChildrenCount(activity: ActivityJsonLd) {
const result = await client
.count('* as count')
.from('key_value')
// If inReplyTo is a string
.where(
client.raw(
`JSON_EXTRACT(value, "$.object.inReplyTo") = "${objectId}"`,
),
)
// If inReplyTo is an object
.orWhere(
client.raw(
`JSON_EXTRACT(value, "$.object.inReplyTo.id") = "${objectId}"`,
),
);
.where(function () {
// If inReplyTo is a string
this.where(
client.raw(
`JSON_EXTRACT(value, "$.object.inReplyTo") = "${objectId}"`,
),
);

// If inReplyTo is an object
this.orWhere(
client.raw(
`JSON_EXTRACT(value, "$.object.inReplyTo.id") = "${objectId}"`,
),
);
})
.andWhere(client.raw(`JSON_EXTRACT(value, "$.type") = "Create"`));

return result[0].count;
}
Expand All @@ -174,7 +186,8 @@ export async function getActivityParents(activity: ActivityJsonLd) {
client.raw(
`JSON_EXTRACT(value, "$.object.id") = "${objectId}"`,
),
);
)
.andWhere(client.raw(`JSON_EXTRACT(value, "$.type") = "Create"`));

if (result.length === 1) {
const parent = result[0];
Expand Down
10 changes: 0 additions & 10 deletions src/dispatchers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ export async function handleAccept(ctx: Context<ContextData>, accept: Accept) {
ctx.data.logger.info('Handling Accept');
const parsed = (ctx as any).parseUri(accept.objectId);
ctx.data.logger.info('Parsed accept object', { parsed });
// biome-ignore lint/correctness/noConstantCondition: present when adding linting
if (false && parsed?.type !== 'follow') {
ctx.data.logger.info('Not accepting a follow - exit');
return;
}
if (!accept.id) {
ctx.data.logger.info('Accept missing id - exit');
return;
Expand All @@ -143,11 +138,6 @@ export async function handleCreate(ctx: Context<ContextData>, create: Create) {
ctx.data.logger.info('Handling Create');
const parsed = (ctx as any).parseUri(create.objectId);
ctx.data.logger.info('Parsed create object', { parsed });
// biome-ignore lint/correctness/noConstantCondition: present when adding linting
if (false && parsed?.type !== 'article') {
ctx.data.logger.info('Not accepting a follow - exit');
return;
}
if (!create.id) {
ctx.data.logger.info('Create missing id - exit');
return;
Expand Down
105 changes: 71 additions & 34 deletions src/instrumentation.ts
Original file line number Diff line number Diff line change
@@ -1,52 +1,89 @@
import { CloudPropagator } from '@google-cloud/opentelemetry-cloud-trace-propagator';
import { trace } from '@opentelemetry/api';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { NodeSDK } from '@opentelemetry/sdk-node';
import {
BatchSpanProcessor,
SimpleSpanProcessor,
} from '@opentelemetry/sdk-trace-base';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import * as Sentry from '@sentry/node';

const sdk = new NodeSDK({
instrumentations: getNodeAutoInstrumentations({
'@opentelemetry/instrumentation-mysql2': {
addSqlCommenterCommentToQueries: true,
},
}),
});

const provider = new NodeTracerProvider();
let propagator: CloudPropagator | undefined;

if (process.env.K_SERVICE) {
const { TraceExporter } = await import(
'@google-cloud/opentelemetry-cloud-trace-exporter'
);
provider.addSpanProcessor(
new BatchSpanProcessor(
new TraceExporter({
resourceFilter: /.*/, // TODO: filter by our service name?
}),
),
);

propagator = new CloudPropagator();
}

if (process.env.NODE_ENV === 'development') {
const { OTLPTraceExporter } = await import(
'@opentelemetry/exporter-trace-otlp-proto'
);

provider.addSpanProcessor(
new SimpleSpanProcessor(
new OTLPTraceExporter({
url: 'http://jaeger:4318/v1/traces',
}),
),
);
}

provider.register({
propagator,
});

export const tracer = trace.getTracer('activitypub');

try {
sdk.start();
} catch (e) {
console.error(e);
}

if (process.env.SENTRY_DSN) {
const client = Sentry.init({
Sentry.init({
dsn: process.env.SENTRY_DSN,
environment: process.env.NODE_ENV || 'unknown',
release: process.env.K_REVISION,
tracesSampleRate: 1.0,
});

if (process.env.K_SERVICE) {
const { TraceExporter } = await import(
'@google-cloud/opentelemetry-cloud-trace-exporter'
);

client?.traceProvider?.addSpanProcessor(
new BatchSpanProcessor(new TraceExporter({})),
);
}

if (process.env.NODE_ENV === 'testing') {
const { OTLPTraceExporter } = await import(
'@opentelemetry/exporter-trace-otlp-proto'
);

client?.traceProvider?.addSpanProcessor(
new SimpleSpanProcessor(
new OTLPTraceExporter({
url: 'http://jaeger:4318/v1/traces',
}),
),
);
}
}

export function spanWrapper<TArgs extends unknown[], TReturn>(
fn: (...args: TArgs) => TReturn,
fn: (...args: TArgs) => TReturn | Promise<TReturn>,
) {
return (...args: TArgs) => {
return Sentry.startSpan(
{
op: 'fn',
name: fn.name,
},
() => fn(...args),
);
return tracer.startActiveSpan(fn.name || 'anonymous', async (span) => {
try {
const result = await Promise.resolve(fn(...args));
span.end();
return result;
} catch (error) {
span.recordException(error as Error);
span.setStatus({ code: 2 }); // OpenTelemetry ERROR status
span.end();
throw error;
}
});
};
}
Loading

0 comments on commit 1c59413

Please sign in to comment.