diff --git a/apps/api/src/yjs/v2/persistors.ts b/apps/api/src/yjs/v2/persistors.ts index e3941b1b..d69390aa 100644 --- a/apps/api/src/yjs/v2/persistors.ts +++ b/apps/api/src/yjs/v2/persistors.ts @@ -52,7 +52,7 @@ export interface Persistor { export class DocumentPersistor implements Persistor { constructor(private readonly documentId: string) {} - private applyUpdate(ydoc: Y.Doc, update: Buffer) { + private applyUpdate(ydoc: Y.Doc, update: Buffer | Uint8Array) { Y.applyUpdate(ydoc, update) } @@ -62,11 +62,6 @@ export class DocumentPersistor implements Persistor { const ydoc = new Y.Doc() const dbDoc = await (tx ?? prisma()).yjsDocument.findUnique({ where: { documentId: this.documentId }, - include: { - yjsUpdates: { - orderBy: { createdAt: 'asc' }, - }, - }, }) if (!dbDoc) { @@ -77,44 +72,70 @@ export class DocumentPersistor implements Persistor { } } - const updates = dbDoc.yjsUpdates.filter( - (update) => update.clock === dbDoc.clock - ) - logger().trace( - { - documentId: this.documentId, + const update = await (tx ?? prisma()).yjsUpdate.findFirst({ + where: { + yjsDocumentId: dbDoc.id, clock: dbDoc.clock, - updates: updates.length, }, - 'Applying updates to Yjs document' - ) - - this.applyUpdate(ydoc, dbDoc.state) - for (const update of updates) { + select: { update: true }, + orderBy: { createdAt: 'desc' }, + }) + if (update) { this.applyUpdate(ydoc, update.update) } - if (updates.length > 100) { + const updatesCount = await (tx ?? prisma()).yjsUpdate.count({ + where: { + yjsDocumentId: dbDoc.id, + clock: { lte: dbDoc.clock }, + }, + }) + + if (updatesCount > 100) { + logger().trace( + { + id, + documentId: this.documentId, + clock: dbDoc.clock, + updates: updatesCount, + }, + 'Too many updates, cleaning up' + ) const cleanUpdates = async (tx: PrismaTransaction) => { - await tx.yjsUpdate.deleteMany({ + const deleted = await tx.yjsUpdate.deleteMany({ where: { - id: { in: updates.map((update) => update.id) }, + clock: { lte: dbDoc.clock }, }, }) - await tx.yjsDocument.update({ - where: { documentId: this.documentId }, + await tx.yjsUpdate.create({ data: { - state: Buffer.from(Y.encodeStateAsUpdate(ydoc)), + yjsDocumentId: dbDoc.id, + update: Buffer.from(Y.encodeStateAsUpdate(ydoc)), + clock: dbDoc.clock, }, }) + + return deleted.count } + let deleted = 0 if (tx) { - await cleanUpdates(tx) + deleted = await cleanUpdates(tx) } else { - await prisma().$transaction(cleanUpdates) + deleted = await prisma().$transaction(cleanUpdates) } + + logger().trace( + { + id, + documentId: this.documentId, + clock: dbDoc.clock, + updates: updatesCount, + deleted, + }, + 'Finished cleaning up updates' + ) } return { @@ -138,6 +159,7 @@ export class DocumentPersistor implements Persistor { select: { id: true, clock: true }, where: { documentId: this.documentId }, }) + const isNew = !yjsDoc if (!yjsDoc) { yjsDoc = await prisma().yjsDocument.upsert({ where: { documentId: this.documentId }, @@ -150,6 +172,41 @@ export class DocumentPersistor implements Persistor { }) } + if (!isNew) { + const updatesCount = await prisma().yjsUpdate.count({ + where: { + yjsDocumentId: yjsDoc.id, + }, + }) + + if (updatesCount > 100) { + logger().trace( + { + documentId: this.documentId, + clock: yjsDoc.clock, + updates: updatesCount, + }, + 'Too many updates, cleaning up' + ) + await prisma().yjsUpdate.deleteMany({ + where: { + OR: [ + { yjsDocumentId: yjsDoc.id }, + { clock: { lt: yjsDoc.clock } }, + ], + }, + }) + logger().trace( + { + documentId: this.documentId, + clock: yjsDoc.clock, + updates: updatesCount, + }, + 'Finished cleaning up updates' + ) + } + } + const { id } = await prisma().yjsUpdate.create({ data: { yjsDocumentId: yjsDoc.id, @@ -224,9 +281,6 @@ export class AppPersistor implements Persistor { createdAt: 'desc', }, include: { - yjsUpdates: { - orderBy: { createdAt: 'asc' }, - }, userYjsAppDocuments: { where: { // use a new uuid when there is no user @@ -235,11 +289,6 @@ export class AppPersistor implements Persistor { // be manipulating the published state userId: this.userId ?? uuidv4(), }, - include: { - yjsUpdates: { - orderBy: { createdAt: 'asc' }, - }, - }, }, }, }) @@ -249,34 +298,45 @@ export class AppPersistor implements Persistor { doc: | { _tag: 'user'; userYjsAppDocument: UserYjsAppDocument } | { _tag: 'app'; yjsAppDocument: YjsAppDocument }, - updates: YjsUpdate[], + updates: Pick[], tx: PrismaTransaction ) => { - await tx.yjsUpdate.deleteMany({ + const deleted = await tx.yjsUpdate.deleteMany({ where: { - id: { in: updates.map((update) => update.id) }, + OR: [ + { id: { in: updates.map((update) => update.id) } }, + { + clock: { + lt: + doc._tag === 'user' + ? doc.userYjsAppDocument.clock + : doc.yjsAppDocument.clock, + }, + }, + ], }, }) if (doc._tag === 'user') { - await tx.userYjsAppDocument.update({ - where: { - yjsAppDocumentId_userId: { - yjsAppDocumentId: this.yjsAppDocumentId, - userId: doc.userYjsAppDocument.userId, - }, - }, + await tx.yjsUpdate.create({ data: { - state: Buffer.from(Y.encodeStateAsUpdate(ydoc)), + userYjsAppDocumentUserId: doc.userYjsAppDocument.userId, + userYjsAppDocumentYjsAppDocumentId: + doc.userYjsAppDocument.yjsAppDocumentId, + update: Buffer.from(Y.encodeStateAsUpdate(ydoc)), + clock: doc.userYjsAppDocument.clock, }, }) } else { - await tx.yjsAppDocument.update({ - where: { id: this.yjsAppDocumentId }, + await tx.yjsUpdate.create({ data: { - state: Buffer.from(Y.encodeStateAsUpdate(ydoc)), + yjsAppDocumentId: doc.yjsAppDocument.id, + update: Buffer.from(Y.encodeStateAsUpdate(ydoc)), + clock: doc.yjsAppDocument.clock, }, }) } + + return deleted.count } const ydoc = new Y.Doc() @@ -285,67 +345,109 @@ export class AppPersistor implements Persistor { let clock = userYjsAppDoc?.clock ?? yjsAppDoc.clock if (!userYjsAppDoc || !this.userId) { // no user or user never opened the app before. bind to the published state - const updates = yjsAppDoc.yjsUpdates.filter( - (update) => update.clock === yjsAppDoc.clock - ) + + const update = await tx.yjsUpdate.findFirst({ + where: { + yjsAppDocumentId: this.yjsAppDocumentId, + clock: yjsAppDoc.clock, + }, + select: { update: true }, + orderBy: { createdAt: 'desc' }, + }) + this.applyUpdate(ydoc, yjsAppDoc.state) - for (const update of updates) { + if (update) { this.applyUpdate(ydoc, update.update) } - if (updates.length > 100) { - if (tx) { - await cleanUpdates( - ydoc, - { - _tag: 'app', - yjsAppDocument: yjsAppDoc, - }, - updates, - tx - ) - } else { - await prisma().$transaction((tx) => - cleanUpdates( - ydoc, - { _tag: 'app', yjsAppDocument: yjsAppDoc }, - updates, - tx - ) - ) - } + const updatesCount = await tx.yjsUpdate.count({ + where: { + yjsAppDocumentId: this.yjsAppDocumentId, + clock: { lte: yjsAppDoc.clock }, + }, + }) + if (updatesCount > 100) { + logger().trace( + { + id, + yjsAppDocumentId: this.yjsAppDocumentId, + userId: this.userId, + clock: yjsAppDoc.clock, + updates: updatesCount, + }, + 'Too many updates, cleaning up' + ) + const deleted = await cleanUpdates( + ydoc, + { _tag: 'app', yjsAppDocument: yjsAppDoc }, + [], + tx + ) + logger().trace( + { + id, + yjsAppDocumentId: this.yjsAppDocumentId, + userId: this.userId, + clock: yjsAppDoc.clock, + updates: updatesCount, + deleted, + }, + 'Finished cleaning up updates' + ) } } else { // bind to the user's state - const updates = userYjsAppDoc.yjsUpdates.filter( - (update) => update.clock === userYjsAppDoc.clock - ) + const update = await tx.yjsUpdate.findFirst({ + where: { + userYjsAppDocumentUserId: this.userId, + userYjsAppDocumentYjsAppDocumentId: this.yjsAppDocumentId, + clock: userYjsAppDoc.clock, + }, + select: { update: true }, + orderBy: { createdAt: 'desc' }, + }) + this.applyUpdate(ydoc, userYjsAppDoc.state) - for (const update of updates) { + if (update) { this.applyUpdate(ydoc, update.update) } - if (updates.length > 100) { - if (tx) { - await cleanUpdates( - ydoc, - { - _tag: 'user', - userYjsAppDocument: userYjsAppDoc, - }, - updates, - tx - ) - } else { - await prisma().$transaction((tx) => - cleanUpdates( - ydoc, - { _tag: 'user', userYjsAppDocument: userYjsAppDoc }, - updates, - tx - ) - ) - } + const updatesCount = await tx.yjsUpdate.count({ + where: { + userYjsAppDocumentUserId: this.userId, + userYjsAppDocumentYjsAppDocumentId: this.yjsAppDocumentId, + clock: { lte: userYjsAppDoc.clock }, + }, + }) + + if (updatesCount > 100) { + logger().trace( + { + id, + yjsAppDocumentId: this.yjsAppDocumentId, + userId: this.userId, + clock: userYjsAppDoc.clock, + updates: updatesCount, + }, + 'Too many updates, cleaning up' + ) + const deleted = await cleanUpdates( + ydoc, + { _tag: 'user', userYjsAppDocument: userYjsAppDoc }, + [], + tx + ) + logger().trace( + { + id, + yjsAppDocumentId: this.yjsAppDocumentId, + userId: this.userId, + clock: userYjsAppDoc.clock, + updates: updatesCount, + deleted, + }, + 'Finished cleaning up updates' + ) } } @@ -393,6 +495,39 @@ export class AppPersistor implements Persistor { update: {}, }) + const updatesCount = await prisma().yjsUpdate.count({ + where: { + userYjsAppDocumentUserId: this.userId, + userYjsAppDocumentYjsAppDocumentId: this.yjsAppDocumentId, + }, + }) + if (updatesCount > 100) { + logger().trace( + { + yjsAppDocumentId: this.yjsAppDocumentId, + userId: this.userId, + clock: doc.clock, + updates: updatesCount, + }, + 'Too many updates, cleaning up' + ) + await prisma().yjsUpdate.deleteMany({ + where: { + userYjsAppDocumentUserId: this.userId, + userYjsAppDocumentYjsAppDocumentId: this.yjsAppDocumentId, + }, + }) + logger().trace( + { + yjsAppDocumentId: this.yjsAppDocumentId, + userId: this.userId, + clock: doc.clock, + updates: updatesCount, + }, + 'Finished cleaning up updates' + ) + } + const { id } = await prisma().yjsUpdate.create({ data: { userYjsAppDocumentUserId: this.userId, @@ -405,6 +540,35 @@ export class AppPersistor implements Persistor { return id } + const updatesCount = await prisma().yjsUpdate.count({ + where: { + yjsAppDocumentId: this.yjsAppDocumentId, + }, + }) + if (updatesCount > 100) { + logger().trace( + { + yjsAppDocumentId: this.yjsAppDocumentId, + clock: doc.clock, + updates: updatesCount, + }, + 'Too many updates, cleaning up' + ) + await prisma().yjsUpdate.deleteMany({ + where: { + yjsAppDocumentId: this.yjsAppDocumentId, + }, + }) + logger().trace( + { + yjsAppDocumentId: this.yjsAppDocumentId, + clock: doc.clock, + updates: updatesCount, + }, + 'Finished cleaning up updates' + ) + } + const { id } = await prisma().yjsUpdate.create({ data: { yjsAppDocumentId: this.yjsAppDocumentId,