diff --git a/lib/timeline/kafka.ts b/lib/timeline/kafka.ts index 3618c91e8..6d98dd850 100644 --- a/lib/timeline/kafka.ts +++ b/lib/timeline/kafka.ts @@ -1,40 +1,35 @@ -import { producer } from '@app/lib/kafka'; import { logger } from '@app/lib/logger'; import { type TimelineMessage, TimelineWriter } from './writer'; -export async function handleTimelineMessage(op: string, details: string) { - switch (op) { +interface Payload { + op: string; + message: TimelineMessage[keyof TimelineMessage]; +} + +export async function handleTimelineMessage(_: string, value: string) { + const payload = JSON.parse(value) as Payload; + + switch (payload.op) { case 'subject': { - const payload = JSON.parse(details) as TimelineMessage['subject']; - await TimelineWriter.subject(payload); + await TimelineWriter.subject(payload.message as TimelineMessage['subject']); break; } case 'progressEpisode': { - const payload = JSON.parse(details) as TimelineMessage['progressEpisode']; - await TimelineWriter.progressEpisode(payload); + await TimelineWriter.progressEpisode(payload.message as TimelineMessage['progressEpisode']); break; } case 'progressSubject': { - const payload = JSON.parse(details) as TimelineMessage['progressSubject']; - await TimelineWriter.progressSubject(payload); + await TimelineWriter.progressSubject(payload.message as TimelineMessage['progressSubject']); break; } case 'statusTsukkomi': { - const payload = JSON.parse(details) as TimelineMessage['statusTsukkomi']; - await TimelineWriter.statusTsukkomi(payload); + await TimelineWriter.statusTsukkomi(payload.message as TimelineMessage['statusTsukkomi']); break; } default: { - logger.error(`Unknown timeline operation: ${op}`); + logger.error(`Unknown timeline operation: ${payload.op}`); break; } } } - -export const TimelineEmitter = { - async emit(op: E, details: TimelineMessage[E]): Promise { - const value = JSON.stringify(details); - await producer.send('timeline', op, value); - }, -}; diff --git a/lib/timeline/writer.ts b/lib/timeline/writer.ts index 80c2bdc7f..c0b47dc74 100644 --- a/lib/timeline/writer.ts +++ b/lib/timeline/writer.ts @@ -4,7 +4,8 @@ import { DateTime } from 'luxon'; import { db, op } from '@app/drizzle/db'; import * as schema from '@app/drizzle/schema'; -import { BadRequestError, UnexpectedNotFoundError } from '@app/lib/error.ts'; +import { BadRequestError } from '@app/lib/error.ts'; +import { producer } from '@app/lib/kafka'; import { CollectionType, EpisodeCollectionStatus, SubjectType } from '@app/lib/subject/type'; import type * as memo from './memo'; @@ -19,60 +20,81 @@ import { TimelineCat, TimelineSource, TimelineStatusType } from './type'; */ export interface TimelineMessage { subject: { - userID: number; - subjectID: number; + uid: number; + subject: { + id: number; + type: SubjectType; + }; + collect: { + id: number; + type: CollectionType; + rate: number; + comment: string; + }; + createdAt: number; }; progressEpisode: { - userID: number; - subjectID: number; - episodeID: number; - status: EpisodeCollectionStatus; + uid: number; + subject: { + id: number; + type: SubjectType; + }; + episode: { + id: number; + status: EpisodeCollectionStatus; + }; + createdAt: number; }; progressSubject: { - userID: number; - subjectID: number; - epsUpdate?: number; - volsUpdate?: number; + uid: number; + subject: { + id: number; + type: SubjectType; + eps: number; + volumes: number; + }; + collect: { + epsUpdate?: number; + volsUpdate?: number; + }; + createdAt: number; }; statusTsukkomi: { - userID: number; + uid: number; text: string; + createdAt: number; }; } -export const TimelineWriter = { +type TimelineKafkaSender = { + [T in keyof TimelineMessage]: (message: TimelineMessage[T]) => Promise; +}; + +/** 写入时间轴的 Kafka Topic */ +export const AsyncTimelineWriter: TimelineKafkaSender = new Proxy({} as TimelineKafkaSender, { + get: (_, op: keyof TimelineMessage) => { + return async (message: TimelineMessage[typeof op]) => { + const value = JSON.stringify({ op, message }); + await producer.send('timeline', message.uid.toString(), value); + }; + }, +}); + +type TimelineDatabaseWriter = { + [T in keyof TimelineMessage]: (message: TimelineMessage[T]) => Promise; +}; + +/** 写入时间轴的 MySQL 数据库表 */ +export const TimelineWriter: TimelineDatabaseWriter = { /** 收藏条目 */ async subject(payload: TimelineMessage['subject']): Promise { - const [subject] = await db - .select() - .from(schema.chiiSubjects) - .where(op.eq(schema.chiiSubjects.id, payload.subjectID)) - .limit(1); - if (!subject) { - throw new UnexpectedNotFoundError('subject not found'); - } - const [interest] = await db - .select() - .from(schema.chiiSubjectInterests) - .where( - op.and( - op.eq(schema.chiiSubjectInterests.uid, payload.userID), - op.eq(schema.chiiSubjectInterests.subjectID, payload.subjectID), - op.ne(schema.chiiSubjectInterests.type, 0), - ), - ) - .limit(1); - if (!interest) { - throw new UnexpectedNotFoundError('interest not found'); - } - - const type = switchSubjectType(interest.type, subject.typeID); + const type = switchSubjectType(payload.collect.type, payload.subject.type); const [previous] = await db .select() .from(schema.chiiTimeline) .where( op.and( - op.eq(schema.chiiTimeline.uid, payload.userID), + op.eq(schema.chiiTimeline.uid, payload.uid), op.eq(schema.chiiTimeline.cat, TimelineCat.Subject), op.eq(schema.chiiTimeline.type, type), ), @@ -81,10 +103,10 @@ export const TimelineWriter = { .limit(1); const detail: memo.Subject = { - subject_id: payload.subjectID, - collect_id: interest.id, - collect_comment: lo.escape(interest.comment), - collect_rate: interest.rate, + subject_id: payload.subject.id, + collect_id: payload.collect.id, + collect_comment: lo.escape(payload.collect.comment), + collect_rate: payload.collect.rate, }; if (previous && previous.createdAt > DateTime.now().minus({ minutes: 10 }).toUnixInteger()) { @@ -111,16 +133,16 @@ export const TimelineWriter = { return previous.id; } else { const [result] = await db.insert(schema.chiiTimeline).values({ - uid: payload.userID, + uid: payload.uid, cat: TimelineCat.Subject, type, - related: payload.subjectID.toString(), + related: payload.subject.id.toString(), memo: php.stringify(detail), img: '', batch: false, source: TimelineSource.Next, replies: 0, - createdAt: DateTime.now().toUnixInteger(), + createdAt: payload.createdAt, }); return result.insertId; } @@ -128,28 +150,21 @@ export const TimelineWriter = { /** 进度 - 剧集 */ async progressEpisode(payload: TimelineMessage['progressEpisode']): Promise { - if (payload.status === EpisodeCollectionStatus.None) { + if (payload.episode.status === EpisodeCollectionStatus.None) { throw new BadRequestError('episode status is none'); } - const [subject] = await db - .select() - .from(schema.chiiSubjects) - .where(op.eq(schema.chiiSubjects.id, payload.subjectID)) - .limit(1); - if (!subject) { - throw new UnexpectedNotFoundError('subject not found'); - } + const detail: memo.ProgressSingle = { - subject_id: payload.subjectID, - subject_type_id: subject.typeID, - ep_id: payload.episodeID, + subject_id: payload.subject.id, + subject_type_id: payload.subject.type, + ep_id: payload.episode.id, }; const [previous] = await db .select() .from(schema.chiiTimeline) .where( op.and( - op.eq(schema.chiiTimeline.uid, payload.userID), + op.eq(schema.chiiTimeline.uid, payload.uid), op.eq(schema.chiiTimeline.cat, TimelineCat.Progress), ), ) @@ -157,10 +172,10 @@ export const TimelineWriter = { .limit(1); if ( previous && - previous.createdAt > DateTime.now().minus({ minutes: 15 }).toUnixInteger() && - Number(previous.related) === payload.subjectID && + previous.createdAt > DateTime.now().minus({ minutes: 10 }).toUnixInteger() && + Number(previous.related) === payload.subject.id && !previous.batch && - previous.type === payload.status + previous.type === payload.episode.status ) { await db .update(schema.chiiTimeline) @@ -173,16 +188,16 @@ export const TimelineWriter = { return previous.id; } else { const [result] = await db.insert(schema.chiiTimeline).values({ - uid: payload.userID, + uid: payload.uid, cat: TimelineCat.Progress, - type: payload.status, - related: payload.subjectID.toString(), + type: payload.episode.status, + related: payload.subject.id.toString(), memo: php.stringify(detail), img: '', batch: false, source: TimelineSource.Next, replies: 0, - createdAt: DateTime.now().toUnixInteger(), + createdAt: payload.createdAt, }); return result.insertId; } @@ -190,34 +205,26 @@ export const TimelineWriter = { /** 进度 - 条目 */ async progressSubject(payload: TimelineMessage['progressSubject']) { - const [subject] = await db - .select() - .from(schema.chiiSubjects) - .where(op.eq(schema.chiiSubjects.id, payload.subjectID)) - .limit(1); - if (!subject) { - throw new UnexpectedNotFoundError('subject not found'); - } const detail: memo.ProgressBatch = { - subject_id: payload.subjectID, - subject_type_id: subject.typeID, - eps_total: subject.eps === 0 ? '??' : subject.eps.toString(), - eps_update: payload.epsUpdate, - vols_total: subject.volumes === 0 ? '??' : subject.volumes.toString(), - vols_update: payload.volsUpdate, + subject_id: payload.subject.id, + subject_type_id: payload.subject.type, + eps_total: payload.subject.eps === 0 ? '??' : payload.subject.eps.toString(), + eps_update: payload.collect.epsUpdate, + vols_total: payload.subject.volumes === 0 ? '??' : payload.subject.volumes.toString(), + vols_update: payload.collect.volsUpdate, }; const [previous] = await db .select() .from(schema.chiiTimeline) .where( op.and( - op.eq(schema.chiiTimeline.uid, payload.userID), + op.eq(schema.chiiTimeline.uid, payload.uid), op.eq(schema.chiiTimeline.cat, TimelineCat.Progress), op.eq(schema.chiiTimeline.type, 0), ), ) .limit(1); - if (previous && previous.createdAt > DateTime.now().minus({ minutes: 15 }).toUnixInteger()) { + if (previous && previous.createdAt > DateTime.now().minus({ minutes: 10 }).toUnixInteger()) { await db .update(schema.chiiTimeline) .set({ @@ -229,16 +236,16 @@ export const TimelineWriter = { return previous.id; } else { const [result] = await db.insert(schema.chiiTimeline).values({ - uid: payload.userID, + uid: payload.uid, cat: TimelineCat.Progress, type: 0, - related: payload.subjectID.toString(), + related: payload.subject.id.toString(), memo: php.stringify(detail), img: '', batch: false, source: TimelineSource.Next, replies: 0, - createdAt: DateTime.now().toUnixInteger(), + createdAt: payload.createdAt, }); return result.insertId; } @@ -247,7 +254,7 @@ export const TimelineWriter = { /** 状态 - 吐槽 */ async statusTsukkomi(payload: TimelineMessage['statusTsukkomi']) { const [result] = await db.insert(schema.chiiTimeline).values({ - uid: payload.userID, + uid: payload.uid, cat: TimelineCat.Status, type: TimelineStatusType.Tsukkomi, related: '', @@ -256,7 +263,7 @@ export const TimelineWriter = { batch: false, source: TimelineSource.Next, replies: 0, - createdAt: DateTime.now().toUnixInteger(), + createdAt: payload.createdAt, }); return result.insertId; }, diff --git a/routes/private/routes/timeline.ts b/routes/private/routes/timeline.ts index 4c34e7bae..974af8e26 100644 --- a/routes/private/routes/timeline.ts +++ b/routes/private/routes/timeline.ts @@ -1,5 +1,6 @@ import { Type as t } from '@sinclair/typebox'; import * as lo from 'lodash-es'; +import { DateTime } from 'luxon'; import { NotAllowedError } from '@app/lib/auth'; import { Dam } from '@app/lib/dam'; @@ -102,7 +103,11 @@ export async function setup(app: App) { } await rateLimit(LimitAction.Timeline, auth.userID); - const id = await TimelineWriter.statusTsukkomi({ userID: auth.userID, text }); + const id = await TimelineWriter.statusTsukkomi({ + uid: auth.userID, + text, + createdAt: DateTime.now().toUnixInteger(), + }); return { id }; }, );