Skip to content

Commit

Permalink
feat: add more field to timeline message (#1023)
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc authored Feb 9, 2025
1 parent 911bfec commit fa80b4d
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 106 deletions.
33 changes: 14 additions & 19 deletions lib/timeline/kafka.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,35 @@
import { producer } from '@app/lib/kafka';
import { logger } from '@app/lib/logger';

import { type TimelineMessage, TimelineWriter } from './writer';

export async function handleTimelineMessage(op: string, details: string) {
switch (op) {
interface Payload {
op: string;
message: TimelineMessage[keyof TimelineMessage];
}

export async function handleTimelineMessage(_: string, value: string) {
const payload = JSON.parse(value) as Payload;

switch (payload.op) {
case 'subject': {
const payload = JSON.parse(details) as TimelineMessage['subject'];
await TimelineWriter.subject(payload);
await TimelineWriter.subject(payload.message as TimelineMessage['subject']);
break;
}
case 'progressEpisode': {
const payload = JSON.parse(details) as TimelineMessage['progressEpisode'];
await TimelineWriter.progressEpisode(payload);
await TimelineWriter.progressEpisode(payload.message as TimelineMessage['progressEpisode']);
break;
}
case 'progressSubject': {
const payload = JSON.parse(details) as TimelineMessage['progressSubject'];
await TimelineWriter.progressSubject(payload);
await TimelineWriter.progressSubject(payload.message as TimelineMessage['progressSubject']);
break;
}
case 'statusTsukkomi': {
const payload = JSON.parse(details) as TimelineMessage['statusTsukkomi'];
await TimelineWriter.statusTsukkomi(payload);
await TimelineWriter.statusTsukkomi(payload.message as TimelineMessage['statusTsukkomi']);
break;
}
default: {
logger.error(`Unknown timeline operation: ${op}`);
logger.error(`Unknown timeline operation: ${payload.op}`);
break;
}
}
}

export const TimelineEmitter = {
async emit<E extends keyof TimelineMessage>(op: E, details: TimelineMessage[E]): Promise<void> {
const value = JSON.stringify(details);
await producer.send('timeline', op, value);
},
};
179 changes: 93 additions & 86 deletions lib/timeline/writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { DateTime } from 'luxon';

import { db, op } from '@app/drizzle/db';
import * as schema from '@app/drizzle/schema';
import { BadRequestError, UnexpectedNotFoundError } from '@app/lib/error.ts';
import { BadRequestError } from '@app/lib/error.ts';
import { producer } from '@app/lib/kafka';
import { CollectionType, EpisodeCollectionStatus, SubjectType } from '@app/lib/subject/type';

import type * as memo from './memo';
Expand All @@ -19,60 +20,81 @@ import { TimelineCat, TimelineSource, TimelineStatusType } from './type';
*/
export interface TimelineMessage {
subject: {
userID: number;
subjectID: number;
uid: number;
subject: {
id: number;
type: SubjectType;
};
collect: {
id: number;
type: CollectionType;
rate: number;
comment: string;
};
createdAt: number;
};
progressEpisode: {
userID: number;
subjectID: number;
episodeID: number;
status: EpisodeCollectionStatus;
uid: number;
subject: {
id: number;
type: SubjectType;
};
episode: {
id: number;
status: EpisodeCollectionStatus;
};
createdAt: number;
};
progressSubject: {
userID: number;
subjectID: number;
epsUpdate?: number;
volsUpdate?: number;
uid: number;
subject: {
id: number;
type: SubjectType;
eps: number;
volumes: number;
};
collect: {
epsUpdate?: number;
volsUpdate?: number;
};
createdAt: number;
};
statusTsukkomi: {
userID: number;
uid: number;
text: string;
createdAt: number;
};
}

export const TimelineWriter = {
type TimelineKafkaSender = {
[T in keyof TimelineMessage]: (message: TimelineMessage[T]) => Promise<void>;
};

/** 写入时间轴的 Kafka Topic */
export const AsyncTimelineWriter: TimelineKafkaSender = new Proxy({} as TimelineKafkaSender, {
get: (_, op: keyof TimelineMessage) => {
return async (message: TimelineMessage[typeof op]) => {
const value = JSON.stringify({ op, message });
await producer.send('timeline', message.uid.toString(), value);
};
},
});

type TimelineDatabaseWriter = {
[T in keyof TimelineMessage]: (message: TimelineMessage[T]) => Promise<number>;
};

/** 写入时间轴的 MySQL 数据库表 */
export const TimelineWriter: TimelineDatabaseWriter = {
/** 收藏条目 */
async subject(payload: TimelineMessage['subject']): Promise<number> {
const [subject] = await db
.select()
.from(schema.chiiSubjects)
.where(op.eq(schema.chiiSubjects.id, payload.subjectID))
.limit(1);
if (!subject) {
throw new UnexpectedNotFoundError('subject not found');
}
const [interest] = await db
.select()
.from(schema.chiiSubjectInterests)
.where(
op.and(
op.eq(schema.chiiSubjectInterests.uid, payload.userID),
op.eq(schema.chiiSubjectInterests.subjectID, payload.subjectID),
op.ne(schema.chiiSubjectInterests.type, 0),
),
)
.limit(1);
if (!interest) {
throw new UnexpectedNotFoundError('interest not found');
}

const type = switchSubjectType(interest.type, subject.typeID);
const type = switchSubjectType(payload.collect.type, payload.subject.type);
const [previous] = await db
.select()
.from(schema.chiiTimeline)
.where(
op.and(
op.eq(schema.chiiTimeline.uid, payload.userID),
op.eq(schema.chiiTimeline.uid, payload.uid),
op.eq(schema.chiiTimeline.cat, TimelineCat.Subject),
op.eq(schema.chiiTimeline.type, type),
),
Expand All @@ -81,10 +103,10 @@ export const TimelineWriter = {
.limit(1);

const detail: memo.Subject = {
subject_id: payload.subjectID,
collect_id: interest.id,
collect_comment: lo.escape(interest.comment),
collect_rate: interest.rate,
subject_id: payload.subject.id,
collect_id: payload.collect.id,
collect_comment: lo.escape(payload.collect.comment),
collect_rate: payload.collect.rate,
};

if (previous && previous.createdAt > DateTime.now().minus({ minutes: 10 }).toUnixInteger()) {
Expand All @@ -111,56 +133,49 @@ export const TimelineWriter = {
return previous.id;
} else {
const [result] = await db.insert(schema.chiiTimeline).values({
uid: payload.userID,
uid: payload.uid,
cat: TimelineCat.Subject,
type,
related: payload.subjectID.toString(),
related: payload.subject.id.toString(),
memo: php.stringify(detail),
img: '',
batch: false,
source: TimelineSource.Next,
replies: 0,
createdAt: DateTime.now().toUnixInteger(),
createdAt: payload.createdAt,
});
return result.insertId;
}
},

/** 进度 - 剧集 */
async progressEpisode(payload: TimelineMessage['progressEpisode']): Promise<number> {
if (payload.status === EpisodeCollectionStatus.None) {
if (payload.episode.status === EpisodeCollectionStatus.None) {
throw new BadRequestError('episode status is none');
}
const [subject] = await db
.select()
.from(schema.chiiSubjects)
.where(op.eq(schema.chiiSubjects.id, payload.subjectID))
.limit(1);
if (!subject) {
throw new UnexpectedNotFoundError('subject not found');
}

const detail: memo.ProgressSingle = {
subject_id: payload.subjectID,
subject_type_id: subject.typeID,
ep_id: payload.episodeID,
subject_id: payload.subject.id,
subject_type_id: payload.subject.type,
ep_id: payload.episode.id,
};
const [previous] = await db
.select()
.from(schema.chiiTimeline)
.where(
op.and(
op.eq(schema.chiiTimeline.uid, payload.userID),
op.eq(schema.chiiTimeline.uid, payload.uid),
op.eq(schema.chiiTimeline.cat, TimelineCat.Progress),
),
)
.orderBy(op.desc(schema.chiiTimeline.id))
.limit(1);
if (
previous &&
previous.createdAt > DateTime.now().minus({ minutes: 15 }).toUnixInteger() &&
Number(previous.related) === payload.subjectID &&
previous.createdAt > DateTime.now().minus({ minutes: 10 }).toUnixInteger() &&
Number(previous.related) === payload.subject.id &&
!previous.batch &&
previous.type === payload.status
previous.type === payload.episode.status
) {
await db
.update(schema.chiiTimeline)
Expand All @@ -173,51 +188,43 @@ export const TimelineWriter = {
return previous.id;
} else {
const [result] = await db.insert(schema.chiiTimeline).values({
uid: payload.userID,
uid: payload.uid,
cat: TimelineCat.Progress,
type: payload.status,
related: payload.subjectID.toString(),
type: payload.episode.status,
related: payload.subject.id.toString(),
memo: php.stringify(detail),
img: '',
batch: false,
source: TimelineSource.Next,
replies: 0,
createdAt: DateTime.now().toUnixInteger(),
createdAt: payload.createdAt,
});
return result.insertId;
}
},

/** 进度 - 条目 */
async progressSubject(payload: TimelineMessage['progressSubject']) {
const [subject] = await db
.select()
.from(schema.chiiSubjects)
.where(op.eq(schema.chiiSubjects.id, payload.subjectID))
.limit(1);
if (!subject) {
throw new UnexpectedNotFoundError('subject not found');
}
const detail: memo.ProgressBatch = {
subject_id: payload.subjectID,
subject_type_id: subject.typeID,
eps_total: subject.eps === 0 ? '??' : subject.eps.toString(),
eps_update: payload.epsUpdate,
vols_total: subject.volumes === 0 ? '??' : subject.volumes.toString(),
vols_update: payload.volsUpdate,
subject_id: payload.subject.id,
subject_type_id: payload.subject.type,
eps_total: payload.subject.eps === 0 ? '??' : payload.subject.eps.toString(),
eps_update: payload.collect.epsUpdate,
vols_total: payload.subject.volumes === 0 ? '??' : payload.subject.volumes.toString(),
vols_update: payload.collect.volsUpdate,
};
const [previous] = await db
.select()
.from(schema.chiiTimeline)
.where(
op.and(
op.eq(schema.chiiTimeline.uid, payload.userID),
op.eq(schema.chiiTimeline.uid, payload.uid),
op.eq(schema.chiiTimeline.cat, TimelineCat.Progress),
op.eq(schema.chiiTimeline.type, 0),
),
)
.limit(1);
if (previous && previous.createdAt > DateTime.now().minus({ minutes: 15 }).toUnixInteger()) {
if (previous && previous.createdAt > DateTime.now().minus({ minutes: 10 }).toUnixInteger()) {
await db
.update(schema.chiiTimeline)
.set({
Expand All @@ -229,16 +236,16 @@ export const TimelineWriter = {
return previous.id;
} else {
const [result] = await db.insert(schema.chiiTimeline).values({
uid: payload.userID,
uid: payload.uid,
cat: TimelineCat.Progress,
type: 0,
related: payload.subjectID.toString(),
related: payload.subject.id.toString(),
memo: php.stringify(detail),
img: '',
batch: false,
source: TimelineSource.Next,
replies: 0,
createdAt: DateTime.now().toUnixInteger(),
createdAt: payload.createdAt,
});
return result.insertId;
}
Expand All @@ -247,7 +254,7 @@ export const TimelineWriter = {
/** 状态 - 吐槽 */
async statusTsukkomi(payload: TimelineMessage['statusTsukkomi']) {
const [result] = await db.insert(schema.chiiTimeline).values({
uid: payload.userID,
uid: payload.uid,
cat: TimelineCat.Status,
type: TimelineStatusType.Tsukkomi,
related: '',
Expand All @@ -256,7 +263,7 @@ export const TimelineWriter = {
batch: false,
source: TimelineSource.Next,
replies: 0,
createdAt: DateTime.now().toUnixInteger(),
createdAt: payload.createdAt,
});
return result.insertId;
},
Expand Down
7 changes: 6 additions & 1 deletion routes/private/routes/timeline.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Type as t } from '@sinclair/typebox';
import * as lo from 'lodash-es';
import { DateTime } from 'luxon';

import { NotAllowedError } from '@app/lib/auth';
import { Dam } from '@app/lib/dam';
Expand Down Expand Up @@ -102,7 +103,11 @@ export async function setup(app: App) {
}

await rateLimit(LimitAction.Timeline, auth.userID);
const id = await TimelineWriter.statusTsukkomi({ userID: auth.userID, text });
const id = await TimelineWriter.statusTsukkomi({
uid: auth.userID,
text,
createdAt: DateTime.now().toUnixInteger(),
});
return { id };
},
);
Expand Down

0 comments on commit fa80b4d

Please sign in to comment.