From 325dace4377ccd8d53e26f3e0b706876fdaf4e20 Mon Sep 17 00:00:00 2001 From: Timo <16718859+toger5@users.noreply.github.com> Date: Thu, 14 Nov 2024 14:21:20 +0100 Subject: [PATCH] Fix local echo in embedded mode (#4498) * fix local echo * dont use custome event emitter anymore * move logic into updateTxId * temp testing * use generic eventEmtitter names * add tests --------- Co-authored-by: Robin Co-authored-by: Hugh Nimmo-Smith --- spec/unit/embedded.spec.ts | 147 ++++++++++++++++++++++- src/embedded.ts | 67 ++++++++++- src/matrixrtc/MatrixRTCSessionManager.ts | 2 +- 3 files changed, 213 insertions(+), 3 deletions(-) diff --git a/spec/unit/embedded.spec.ts b/spec/unit/embedded.spec.ts index b09d3a27097..69b3c43827e 100644 --- a/spec/unit/embedded.spec.ts +++ b/spec/unit/embedded.spec.ts @@ -30,6 +30,7 @@ import { ITurnServer, IRoomEvent, IOpenIDCredentials, + ISendEventFromWidgetResponseData, WidgetApiResponseError, } from "matrix-widget-api"; @@ -40,6 +41,7 @@ import { ICapabilities, RoomWidgetClient } from "../../src/embedded"; import { MatrixEvent } from "../../src/models/event"; import { ToDeviceBatch } from "../../src/models/ToDeviceMessage"; import { DeviceInfo } from "../../src/crypto/deviceinfo"; +import { sleep } from "../../src/utils"; const testOIDCToken = { access_token: "12345678", @@ -127,9 +129,16 @@ describe("RoomWidgetClient", () => { const makeClient = async ( capabilities: ICapabilities, sendContentLoaded: boolean | undefined = undefined, + userId?: string, ): Promise => { const baseUrl = "https://example.org"; - client = createRoomWidgetClient(widgetApi, capabilities, "!1:example.org", { baseUrl }, sendContentLoaded); + client = createRoomWidgetClient( + widgetApi, + capabilities, + "!1:example.org", + { baseUrl, userId }, + sendContentLoaded, + ); expect(widgetApi.start).toHaveBeenCalled(); // needs to have been called early in order to not miss messages widgetApi.emit("ready"); await client.startClient(); @@ -192,6 +201,142 @@ describe("RoomWidgetClient", () => { .map((e) => e.getEffectiveEvent()), ).toEqual([event]); }); + describe("local echos", () => { + const setupRemoteEcho = () => { + makeClient( + { + receiveEvent: ["org.matrix.rageshake_request"], + sendEvent: ["org.matrix.rageshake_request"], + }, + undefined, + "@me:example.org", + ); + expect(widgetApi.requestCapabilityForRoomTimeline).toHaveBeenCalledWith("!1:example.org"); + expect(widgetApi.requestCapabilityToReceiveEvent).toHaveBeenCalledWith("org.matrix.rageshake_request"); + const injectSpy = jest.spyOn((client as any).syncApi, "injectRoomEvents"); + const widgetSendEmitter = new EventEmitter(); + const widgetSendPromise = new Promise((resolve) => + widgetSendEmitter.once("send", () => resolve()), + ); + const resolveWidgetSend = () => widgetSendEmitter.emit("send"); + widgetApi.sendRoomEvent.mockImplementation( + async (eType, content, roomId): Promise => { + await widgetSendPromise; + return { room_id: "!1:example.org", event_id: "event_id" }; + }, + ); + return { injectSpy, resolveWidgetSend }; + }; + const remoteEchoEvent = new CustomEvent(`action:${WidgetApiToWidgetAction.SendEvent}`, { + detail: { + data: { + type: "org.matrix.rageshake_request", + + room_id: "!1:example.org", + event_id: "event_id", + sender: "@me:example.org", + state_key: "bar", + content: { hello: "world" }, + unsigned: { transaction_id: "1234" }, + }, + }, + cancelable: true, + }); + it("get response then local echo", async () => { + await sleep(600); + const { injectSpy, resolveWidgetSend } = await setupRemoteEcho(); + + // Begin by sending an event: + client.sendEvent("!1:example.org", "org.matrix.rageshake_request", { request_id: 12 }, "widgetTxId"); + // we do not expect it to be send -- until we call `resolveWidgetSend` + expect(injectSpy).not.toHaveBeenCalled(); + + // We first get the response from the widget + resolveWidgetSend(); + // We then get the remote echo from the widget + widgetApi.emit(`action:${WidgetApiToWidgetAction.SendEvent}`, remoteEchoEvent); + + // gets emitted after the event got injected + await new Promise((resolve) => client.once(ClientEvent.Event, () => resolve())); + expect(injectSpy).toHaveBeenCalled(); + + const call = injectSpy.mock.calls[0] as any; + const injectedEv = call[2][0]; + expect(injectedEv.getType()).toBe("org.matrix.rageshake_request"); + expect(injectedEv.getUnsigned().transaction_id).toBe("widgetTxId"); + }); + + it("get local echo then response", async () => { + await sleep(600); + const { injectSpy, resolveWidgetSend } = await setupRemoteEcho(); + + // Begin by sending an event: + client.sendEvent("!1:example.org", "org.matrix.rageshake_request", { request_id: 12 }, "widgetTxId"); + // we do not expect it to be send -- until we call `resolveWidgetSend` + expect(injectSpy).not.toHaveBeenCalled(); + + // We first get the remote echo from the widget + widgetApi.emit(`action:${WidgetApiToWidgetAction.SendEvent}`, remoteEchoEvent); + expect(injectSpy).not.toHaveBeenCalled(); + + // We then get the response from the widget + resolveWidgetSend(); + + // Gets emitted after the event got injected + await new Promise((resolve) => client.once(ClientEvent.Event, () => resolve())); + expect(injectSpy).toHaveBeenCalled(); + + const call = injectSpy.mock.calls[0] as any; + const injectedEv = call[2][0]; + expect(injectedEv.getType()).toBe("org.matrix.rageshake_request"); + expect(injectedEv.getUnsigned().transaction_id).toBe("widgetTxId"); + }); + it("__ local echo then response", async () => { + await sleep(600); + const { injectSpy, resolveWidgetSend } = await setupRemoteEcho(); + + // Begin by sending an event: + client.sendEvent("!1:example.org", "org.matrix.rageshake_request", { request_id: 12 }, "widgetTxId"); + // we do not expect it to be send -- until we call `resolveWidgetSend` + expect(injectSpy).not.toHaveBeenCalled(); + + // We first get the remote echo from the widget + widgetApi.emit(`action:${WidgetApiToWidgetAction.SendEvent}`, remoteEchoEvent); + const otherRemoteEcho = new CustomEvent(`action:${WidgetApiToWidgetAction.SendEvent}`, { + detail: { data: { ...remoteEchoEvent.detail.data } }, + }); + otherRemoteEcho.detail.data.unsigned.transaction_id = "4567"; + otherRemoteEcho.detail.data.event_id = "other_id"; + widgetApi.emit(`action:${WidgetApiToWidgetAction.SendEvent}`, otherRemoteEcho); + + // Simulate the wait time while the widget is waiting for a response + // after we already received the remote echo + await sleep(20); + // even after the wait we do not want any event to be injected. + // we do not know their event_id and cannot know if they are the remote echo + // where we need to update the txId because they are send by this client + expect(injectSpy).not.toHaveBeenCalled(); + // We then get the response from the widget + resolveWidgetSend(); + + // Gets emitted after the event got injected + await new Promise((resolve) => client.once(ClientEvent.Event, () => resolve())); + // Now we want both events to be injected since we know the txId - event_id match + expect(injectSpy).toHaveBeenCalled(); + + // it has been called with the event sent by ourselves + const call = injectSpy.mock.calls[0] as any; + const injectedEv = call[2][0]; + expect(injectedEv.getType()).toBe("org.matrix.rageshake_request"); + expect(injectedEv.getUnsigned().transaction_id).toBe("widgetTxId"); + + // It has been called by the event we blocked because of our send right afterwards + const call2 = injectSpy.mock.calls[1] as any; + const injectedEv2 = call2[2][0]; + expect(injectedEv2.getType()).toBe("org.matrix.rageshake_request"); + expect(injectedEv2.getUnsigned().transaction_id).toBe("4567"); + }); + }); it("handles widget errors with generic error data", async () => { const error = new Error("failed to send"); diff --git a/src/embedded.ts b/src/embedded.ts index bc32398e3b5..a92607fc31c 100644 --- a/src/embedded.ts +++ b/src/embedded.ts @@ -57,6 +57,7 @@ import { ToDeviceBatch, ToDevicePayload } from "./models/ToDeviceMessage.ts"; import { DeviceInfo } from "./crypto/deviceinfo.ts"; import { IOlmDevice } from "./crypto/algorithms/megolm.ts"; import { MapWithDefault, recursiveMapToObject } from "./utils.ts"; +import { TypedEventEmitter } from "./matrix.ts"; interface IStateEventRequest { eventType: string; @@ -123,6 +124,10 @@ export interface ICapabilities { updateDelayedEvents?: boolean; } +export enum RoomWidgetClientEvent { + PendingEventsChanged = "PendingEvent.pendingEventsChanged", +} +export type EventHandlerMap = { [RoomWidgetClientEvent.PendingEventsChanged]: () => void }; /** * A MatrixClient that routes its requests through the widget API instead of the * real CS API. @@ -134,6 +139,9 @@ export class RoomWidgetClient extends MatrixClient { private lifecycle?: AbortController; private syncState: SyncState | null = null; + private pendingSendingEventsTxId: { type: string; id: string | undefined; txId: string }[] = []; + private eventEmitter = new TypedEventEmitter(); + /** * * @param widgetApi - The widget api to use for communication. @@ -330,6 +338,8 @@ export class RoomWidgetClient extends MatrixClient { const content = event.event.redacts ? { ...event.getContent(), redacts: event.event.redacts } : event.getContent(); + + // Delayed event special case. if (delayOpts) { // TODO: updatePendingEvent for delayed events? const response = await this.widgetApi.sendRoomEvent( @@ -342,6 +352,10 @@ export class RoomWidgetClient extends MatrixClient { return this.validateSendDelayedEventResponse(response); } + const txId = event.getTxnId(); + // Add the txnId to the pending list (still with unknown evID) + if (txId) this.pendingSendingEventsTxId.push({ type: event.getType(), id: undefined, txId }); + let response: ISendEventFromWidgetResponseData; try { response = await this.widgetApi.sendRoomEvent(event.getType(), content, room.roomId); @@ -349,9 +363,15 @@ export class RoomWidgetClient extends MatrixClient { this.updatePendingEventStatus(room, event, EventStatus.NOT_SENT); throw e; } - // This also checks for an event id on the response room.updatePendingEvent(event, EventStatus.SENT, response.event_id); + + // Update the pending events list with the eventId + this.pendingSendingEventsTxId.forEach((p) => { + if (p.txId === txId) p.id = response.event_id; + }); + this.eventEmitter.emit(RoomWidgetClientEvent.PendingEventsChanged); + return { event_id: response.event_id! }; } @@ -495,6 +515,48 @@ export class RoomWidgetClient extends MatrixClient { await this.widgetApi.transport.reply(ev.detail, {}); } + private updateTxId = async (event: MatrixEvent): Promise => { + // We update the txId for remote echos that originate from this client. + // This happens with the help of `pendingSendingEventsTxId` where we store all events that are currently sending + // with their widget txId and once ready the final evId. + if ( + // This could theoretically be an event send by this device + // In that case we need to update the txId of the event because the embedded client/widget + // knows this event with a different transaction Id than what was used by the host client. + event.getSender() === this.getUserId() && + // We optimize by not blocking events from types that we have not send + // with this client. + this.pendingSendingEventsTxId.some((p) => event.getType() === p.type) + ) { + // Compare by event Id if we have a matching pending event where we know the txId. + let matchingTxId = this.pendingSendingEventsTxId.find((p) => p.id === event.getId())?.txId; + // Block any further processing of this event until we have received the sending response. + // -> until we know the event id. + // -> until we have not pending events anymore. + while (!matchingTxId && this.pendingSendingEventsTxId.length > 0) { + // Recheck whenever the PendingEventsChanged + await new Promise((resolve) => + this.eventEmitter.once(RoomWidgetClientEvent.PendingEventsChanged, () => resolve()), + ); + matchingTxId = this.pendingSendingEventsTxId.find((p) => p.id === event.getId())?.txId; + } + + // We found the correct txId: we update the event and delete the entry of the pending events. + if (matchingTxId) { + event.setTxnId(matchingTxId); + event.setUnsigned({ ...event.getUnsigned(), transaction_id: matchingTxId }); + } + this.pendingSendingEventsTxId = this.pendingSendingEventsTxId.filter((p) => p.id !== event.getId()); + + // Emit once there are no pending events anymore to release all other events that got + // awaited in the `while (!matchingTxId && this.pendingSendingEventsTxId.length > 0)` loop + // but are not send by this client. + if (this.pendingSendingEventsTxId.length === 0) { + this.eventEmitter.emit(RoomWidgetClientEvent.PendingEventsChanged); + } + } + }; + private onEvent = async (ev: CustomEvent): Promise => { ev.preventDefault(); @@ -502,6 +564,9 @@ export class RoomWidgetClient extends MatrixClient { // send us events from other rooms if this widget is always on screen if (ev.detail.data.room_id === this.roomId) { const event = new MatrixEvent(ev.detail.data as Partial); + + // Only inject once we have update the txId + await this.updateTxId(event); await this.syncApi!.injectRoomEvents(this.room!, [], [event]); this.emit(ClientEvent.Event, event); this.setSyncState(SyncState.Syncing); diff --git a/src/matrixrtc/MatrixRTCSessionManager.ts b/src/matrixrtc/MatrixRTCSessionManager.ts index e64d19f7a6e..ee0cbb4d886 100644 --- a/src/matrixrtc/MatrixRTCSessionManager.ts +++ b/src/matrixrtc/MatrixRTCSessionManager.ts @@ -56,7 +56,7 @@ export class MatrixRTCSessionManager extends TypedEventEmitter 0) {