diff --git a/packages/sdk/src/client/attachment.ts b/packages/sdk/src/client/attachment.ts index 1a020570d..18aec7899 100644 --- a/packages/sdk/src/client/attachment.ts +++ b/packages/sdk/src/client/attachment.ts @@ -1,5 +1,6 @@ import { Document, Indexable } from '@yorkie-js-sdk/src/document/document'; import { SyncMode } from '@yorkie-js-sdk/src/client/client'; +import { Unsubscribe } from '../yorkie'; /** * `WatchStream` is a stream that watches the changes of the document. @@ -21,17 +22,21 @@ export class Attachment { watchLoopTimerID?: ReturnType; watchAbortController?: AbortController; + unsubscribeBroadcastEvent: Unsubscribe; + constructor( reconnectStreamDelay: number, doc: Document, docID: string, syncMode: SyncMode, + unsubscribeBroacastEvent: Unsubscribe, ) { this.reconnectStreamDelay = reconnectStreamDelay; this.doc = doc; this.docID = docID; this.syncMode = syncMode; this.remoteChangeEventReceived = false; + this.unsubscribeBroadcastEvent = unsubscribeBroacastEvent; } /** diff --git a/packages/sdk/src/client/client.ts b/packages/sdk/src/client/client.ts index 03338bc80..aa406ed40 100644 --- a/packages/sdk/src/client/client.ts +++ b/packages/sdk/src/client/client.ts @@ -42,6 +42,7 @@ import { import { OpSource } from '@yorkie-js-sdk/src/document/operation/operation'; import { createAuthInterceptor } from '@yorkie-js-sdk/src/client/auth_interceptor'; import { createMetricInterceptor } from '@yorkie-js-sdk/src/client/metric_interceptor'; +import { validateSerializable } from '../util/validator'; /** * `SyncMode` defines synchronization modes for the PushPullChanges API. @@ -303,6 +304,21 @@ export class Client { } doc.setActor(this.id!); doc.update((_, p) => p.set(options.initialPresence || {})); + const unsubscribeBroacastEvent = doc.subscribe( + 'local-broadcast', + (event) => { + const { topic, payload } = event.value; + const errorFn = event.error; + + try { + this.broadcast(doc.getKey(), topic, payload); + } catch (error: unknown) { + if (error instanceof Error) { + errorFn?.(error); + } + } + }, + ); const syncMode = options.syncMode ?? SyncMode.Realtime; return this.enqueueTask(async () => { @@ -329,6 +345,7 @@ export class Client { doc, res.documentId, syncMode, + unsubscribeBroacastEvent, ), ); @@ -584,6 +601,59 @@ export class Client { return this.conditions[condition]; } + /** + * `broadcast` broadcasts the given payload to the given topic. + */ + public broadcast( + docKey: DocumentKey, + topic: string, + payload: any, + ): Promise { + if (!this.isActive()) { + throw new YorkieError( + Code.ErrClientNotActivated, + `${this.key} is not active`, + ); + } + const attachment = this.attachmentMap.get(docKey); + if (!attachment) { + throw new YorkieError( + Code.ErrDocumentNotAttached, + `${docKey} is not attached`, + ); + } + + if (!validateSerializable(payload)) { + throw new YorkieError( + Code.ErrInvalidArgument, + 'payload is not serializable', + ); + } + + return this.enqueueTask(async () => { + return this.rpcClient + .broadcast( + { + clientId: this.id!, + documentId: attachment.docID, + topic, + payload: new TextEncoder().encode(JSON.stringify(payload)), + }, + { headers: { 'x-shard-key': `${this.apiKey}/${docKey}` } }, + ) + .then(() => { + logger.info( + `[BC] c:"${this.getKey()}" broadcasts d:"${docKey}" t:"${topic}"`, + ); + }) + .catch((err) => { + logger.error(`[BC] c:"${this.getKey()}" err :`, err); + this.handleConnectError(err); + throw err; + }); + }); + } + /** * `runSyncLoop` runs the sync loop. The sync loop pushes local changes to * the server and pulls remote changes from the server. @@ -748,6 +818,7 @@ export class Client { } attachment.cancelWatchStream(); + attachment.unsubscribeBroadcastEvent(); this.attachmentMap.delete(docKey); } diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index 9343b6353..39007afe5 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -173,6 +173,16 @@ export enum DocEventType { * `PresenceChanged` means that the presences of the client has updated. */ PresenceChanged = 'presence-changed', + + /** + * `Broadcast` means that the broadcast event is received from the remote client. + */ + Broadcast = 'broadcast', + + /** + * `LocalBroadcast` means that the broadcast event is sent from the local client. + */ + LocalBroadcast = 'local-broadcast', } /** @@ -191,7 +201,9 @@ export type DocEvent

= | InitializedEvent

| WatchedEvent

| UnwatchedEvent

- | PresenceChangedEvent

; + | PresenceChangedEvent

+ | BroadcastEvent + | LocalBroadcastEvent; /** * `TransactionEvent` represents document events that occur within @@ -371,6 +383,18 @@ export interface PresenceChangedEvent

value: { clientID: ActorID; presence: P }; } +export interface BroadcastEvent extends BaseDocEvent { + type: DocEventType.Broadcast; + value: { clientID: ActorID; topic: string; payload: any }; + error?: ErrorFn; +} + +export interface LocalBroadcastEvent extends BaseDocEvent { + type: DocEventType.LocalBroadcast; + value: { topic: string; payload: any }; + error?: ErrorFn; +} + type DocEventCallbackMap

= { default: NextFn< | SnapshotEvent @@ -388,6 +412,8 @@ type DocEventCallbackMap

= { connection: NextFn; status: NextFn; sync: NextFn; + broadcast: NextFn; + 'local-broadcast': NextFn; all: NextFn>; }; export type DocEventTopic = keyof DocEventCallbackMap; @@ -818,6 +844,24 @@ export class Document { error?: ErrorFn, complete?: CompleteFn, ): Unsubscribe; + /** + * `subscribe` registers a callback to subscribe to events on the document. + * The callback will be called when the broadcast event is received from the remote client. + */ + public subscribe( + type: 'broadcast', + next: DocEventCallbackMap

['broadcast'], + error?: ErrorFn, + ): Unsubscribe; + /** + * `subscribe` registers a callback to subscribe to events on the document. + * The callback will be called when the local client sends a broadcast event. + */ + public subscribe( + type: 'local-broadcast', + next: DocEventCallbackMap

['local-broadcast'], + error?: ErrorFn, + ): Unsubscribe; /** * `subscribe` registers a callback to subscribe to events on the document. */ @@ -966,6 +1010,30 @@ export class Document { arg4, ); } + if (arg1 === 'local-broadcast') { + const callback = arg2 as DocEventCallbackMap

['local-broadcast']; + return this.eventStream.subscribe((event) => { + for (const docEvent of event) { + if (docEvent.type !== DocEventType.LocalBroadcast) { + continue; + } + + callback(docEvent); + } + }, arg3); + } + if (arg1 === 'broadcast') { + const callback = arg2 as DocEventCallbackMap

['broadcast']; + return this.eventStream.subscribe((event) => { + for (const docEvent of event) { + if (docEvent.type !== DocEventType.Broadcast) { + continue; + } + + callback(docEvent); + } + }, arg3); + } if (arg1 === 'all') { const callback = arg2 as DocEventCallbackMap

['all']; return this.eventStream.subscribe(callback, arg3, arg4); @@ -1024,6 +1092,7 @@ export class Document { complete, ); } + throw new YorkieError(Code.ErrInvalidArgument, `"${arg1}" is not a valid`); } @@ -1468,7 +1537,8 @@ export class Document { if (resp.body.case === 'event') { const { type, publisher } = resp.body.value; - const event: Array | UnwatchedEvent

> = []; + const event: Array | UnwatchedEvent

| BroadcastEvent> = + []; if (type === PbDocEventType.DOCUMENT_WATCHED) { this.addOnlineClient(publisher); // NOTE(chacha912): We added to onlineClients, but we won't trigger watched event @@ -1495,6 +1565,20 @@ export class Document { value: { clientID: publisher, presence }, }); } + } else if (type === PbDocEventType.DOCUMENT_BROADCAST) { + if (resp.body.value.body) { + const { topic, payload } = resp.body.value.body; + const decoder = new TextDecoder(); + + event.push({ + type: DocEventType.Broadcast, + value: { + clientID: publisher, + topic, + payload: JSON.parse(decoder.decode(payload)), + }, + }); + } } if (event.length > 0) { @@ -1970,4 +2054,17 @@ export class Document { public getRedoStackForTest(): Array>> { return this.internalHistory.getRedoStackForTest(); } + + /** + * `broadcast` the payload to the given topic. + */ + public broadcast(topic: string, payload: any, error?: ErrorFn) { + const broadcastEvent: LocalBroadcastEvent = { + type: DocEventType.LocalBroadcast, + value: { topic, payload }, + error, + }; + + this.publish([broadcastEvent]); + } } diff --git a/packages/sdk/src/util/validator.ts b/packages/sdk/src/util/validator.ts new file mode 100644 index 000000000..33625d837 --- /dev/null +++ b/packages/sdk/src/util/validator.ts @@ -0,0 +1,31 @@ +/* + * Copyright 2024 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * `validateSerializable` returns whether the given value is serializable or not. + */ +export const validateSerializable = (value: any): boolean => { + try { + const serialized = JSON.stringify(value); + + if (serialized === undefined) { + return false; + } + } catch (error) { + return false; + } + return true; +}; diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index b315408f4..004e0b506 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -863,4 +863,197 @@ describe.sequential('Client', function () { assert.equal(d1.toSortedJSON(), d2.toSortedJSON()); }, task.name); }); + + it('Should successfully broadcast serializeable payload', async ({ + task, + }) => { + const cli = new yorkie.Client(testRPCAddr); + await cli.activate(); + + const doc = new yorkie.Document<{ t: Text }>(toDocKey(`${task.name}`)); + await cli.attach(doc); + + const broadcastTopic = 'test'; + const payload = { a: 1, b: '2' }; + + expect(async () => doc.broadcast(broadcastTopic, payload)).not.toThrow(); + + await cli.deactivate(); + }); + + it('Should throw error when broadcasting unserializeable payload', async ({ + task, + }) => { + const eventCollector = new EventCollector(); + const cli = new yorkie.Client(testRPCAddr); + await cli.activate(); + + const doc = new yorkie.Document<{ t: Text }>(toDocKey(`${task.name}`)); + await cli.attach(doc); + + // broadcast unserializable payload + const payload = () => {}; + const broadcastTopic = 'test'; + const broadcastErrMessage = 'payload is not serializable'; + + const errorHandler = (error: Error) => { + eventCollector.add(error.message); + }; + + doc.broadcast(broadcastTopic, payload, errorHandler); + + await eventCollector.waitAndVerifyNthEvent(1, broadcastErrMessage); + + await cli.deactivate(); + }); + + it('Should trigger the handler for a subscribed broadcast event', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector = new EventCollector<[string, any]>(); + const broadcastTopic = 'test'; + const unsubscribe = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic) { + eventCollector.add([topic, payload]); + } + }); + + const payload = { a: 1, b: '2' }; + d1.broadcast(broadcastTopic, payload); + await eventCollector.waitAndVerifyNthEvent(1, [ + broadcastTopic, + payload, + ]); + + assert.equal(eventCollector.getLength(), 1); + + unsubscribe(); + }, + task.name, + SyncMode.Realtime, + ); + }); + + it('Should not trigger the handler for an unsubscribed broadcast event', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector = new EventCollector<[string, any]>(); + const broadcastTopic1 = 'test1'; + const broadcastTopic2 = 'test2'; + + const unsubscribe = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic1) { + eventCollector.add([topic, payload]); + } else if (topic === broadcastTopic2) { + eventCollector.add([topic, payload]); + } + }); + + const payload = { a: 1, b: '2' }; + d1.broadcast(broadcastTopic1, payload); + await eventCollector.waitAndVerifyNthEvent(1, [ + broadcastTopic1, + payload, + ]); + + assert.equal(eventCollector.getLength(), 1); + + unsubscribe(); + }, + task.name, + SyncMode.Realtime, + ); + }); + + it('Should not trigger the handler for a broadcast event after unsubscribing', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector = new EventCollector<[string, any]>(); + const broadcastTopic = 'test'; + const unsubscribe = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic) { + eventCollector.add([topic, payload]); + } + }); + + const payload = { a: 1, b: '2' }; + + d1.broadcast(broadcastTopic, payload); + await eventCollector.waitAndVerifyNthEvent(1, [ + broadcastTopic, + payload, + ]); + + unsubscribe(); + + d1.broadcast(broadcastTopic, payload); + + // Assuming that every subscriber can receive the broadcast event within 1000ms. + await new Promise((res) => setTimeout(res, 1000)); + + // No change in the number of calls + assert.equal(eventCollector.getLength(), 1); + }, + task.name, + SyncMode.Realtime, + ); + }); + + it('Should not trigger the handler for a broadcast event sent by the publisher to itself', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector1 = new EventCollector<[string, any]>(); + const eventCollector2 = new EventCollector<[string, any]>(); + const broadcastTopic = 'test'; + const payload = { a: 1, b: '2' }; + + // Publisher subscribes to the broadcast event + const unsubscribe1 = d1.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic) { + eventCollector1.add([topic, payload]); + } + }); + + const unsubscribe2 = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic) { + eventCollector2.add([topic, payload]); + } + }); + + d1.broadcast(broadcastTopic, payload); + + // Assuming that D2 takes longer to receive the broadcast event compared to D1 + await eventCollector2.waitAndVerifyNthEvent(1, [ + broadcastTopic, + payload, + ]); + + unsubscribe1(); + unsubscribe2(); + + assert.equal(eventCollector1.getLength(), 0); + assert.equal(eventCollector2.getLength(), 1); + }, + task.name, + SyncMode.Realtime, + ); + }); }); diff --git a/packages/sdk/test/integration/integration_helper.ts b/packages/sdk/test/integration/integration_helper.ts index 7db44e2ad..f0189e390 100644 --- a/packages/sdk/test/integration/integration_helper.ts +++ b/packages/sdk/test/integration/integration_helper.ts @@ -21,6 +21,7 @@ export async function withTwoClientsAndDocuments( d2: Document, ) => Promise, title: string, + syncMode: SyncMode = SyncMode.Manual, ): Promise { const client1 = new yorkie.Client(testRPCAddr); const client2 = new yorkie.Client(testRPCAddr); @@ -31,8 +32,8 @@ export async function withTwoClientsAndDocuments( const doc1 = new yorkie.Document(docKey); const doc2 = new yorkie.Document(docKey); - await client1.attach(doc1, { syncMode: SyncMode.Manual }); - await client2.attach(doc2, { syncMode: SyncMode.Manual }); + await client1.attach(doc1, { syncMode }); + await client2.attach(doc2, { syncMode }); await callback(client1, doc1, client2, doc2);