From 9bf42e46cc4a06ea18d8259c0761466ad241b518 Mon Sep 17 00:00:00 2001 From: Gunwoo Baik Date: Mon, 30 Sep 2024 20:03:27 +0900 Subject: [PATCH] Add configurable retry mechanism to broadcast interface (#901) Implement a new BroadcastOptions interface to allow for automatic retries on network failures during broadcasts. This enhancement improves resilience against temporary network issues, ensuring more reliable message delivery. The maxRetries option allows users to control retry behavior, with a default of 0 (no retries). Only network errors trigger retries; other errors, such as unserializable payloads, will not initiate retry attempts. --- packages/sdk/src/client/client.ts | 98 +++++++++--- packages/sdk/src/document/document.ts | 159 ++++++++++--------- packages/sdk/test/integration/client_test.ts | 107 ++++++++++++- 3 files changed, 261 insertions(+), 103 deletions(-) diff --git a/packages/sdk/src/client/client.ts b/packages/sdk/src/client/client.ts index bae869c59..9a25cb7bc 100644 --- a/packages/sdk/src/client/client.ts +++ b/packages/sdk/src/client/client.ts @@ -43,7 +43,7 @@ import { OpSource } from '@yorkie-js-sdk/src/document/operation/operation'; import { createAuthInterceptor } from '@yorkie-js-sdk/src/client/auth_interceptor'; import { createMetricInterceptor } from '@yorkie-js-sdk/src/client/metric_interceptor'; import { validateSerializable } from '../util/validator'; -import { Json } from '@yorkie-js-sdk/src/document/document'; +import { Json, BroadcastOptions } from '@yorkie-js-sdk/src/document/document'; /** * `SyncMode` defines synchronization modes for the PushPullChanges API. @@ -161,6 +161,15 @@ const DefaultClientOptions = { reconnectStreamDelay: 1000, }; +/** + * `DefaultBroadcastOptions` is the default options for broadcast. + */ +const DefaultBroadcastOptions = { + maxRetries: Infinity, + initialRetryInterval: 1000, + maxBackoff: 20000, +}; + /** * `Client` is a normal client that can communicate with the server. * It has documents and sends changes of the documents in local @@ -307,12 +316,13 @@ export class Client { doc.update((_, p) => p.set(options.initialPresence || {})); const unsubscribeBroacastEvent = doc.subscribe( 'local-broadcast', - (event) => { + async (event) => { const { topic, payload } = event.value; - const errorFn = event.error; + const errorFn = event.options?.error; + const options = event.options; try { - this.broadcast(doc.getKey(), topic, payload); + await this.broadcast(doc.getKey(), topic, payload, options); } catch (error: unknown) { if (error instanceof Error) { errorFn?.(error); @@ -609,6 +619,7 @@ export class Client { docKey: DocumentKey, topic: string, payload: Json, + options?: BroadcastOptions, ): Promise { if (!this.isActive()) { throw new YorkieError( @@ -631,28 +642,63 @@ export class Client { ); } - return this.enqueueTask(async () => { - return this.rpcClient - .broadcast( - { - clientId: this.id!, - documentId: attachment.docID, - topic, - payload: new TextEncoder().encode(JSON.stringify(payload)), - }, - { headers: { 'x-shard-key': `${this.apiKey}/${docKey}` } }, - ) - .then(() => { - logger.info( - `[BC] c:"${this.getKey()}" broadcasts d:"${docKey}" t:"${topic}"`, - ); - }) - .catch((err) => { - logger.error(`[BC] c:"${this.getKey()}" err :`, err); - this.handleConnectError(err); - throw err; - }); - }); + const maxRetries = + options?.maxRetries ?? DefaultBroadcastOptions.maxRetries; + const maxBackoff = DefaultBroadcastOptions.maxBackoff; + + let retryCount = 0; + + const exponentialBackoff = (retryCount: number) => { + const retryInterval = Math.min( + DefaultBroadcastOptions.initialRetryInterval * 2 ** retryCount, + maxBackoff, + ); + return retryInterval; + }; + + const doLoop = async (): Promise => { + return this.enqueueTask(async () => { + return this.rpcClient + .broadcast( + { + clientId: this.id!, + documentId: attachment.docID, + topic, + payload: new TextEncoder().encode(JSON.stringify(payload)), + }, + { headers: { 'x-shard-key': `${this.apiKey}/${docKey}` } }, + ) + .then(() => { + logger.info( + `[BC] c:"${this.getKey()}" broadcasts d:"${docKey}" t:"${topic}"`, + ); + }) + .catch((err) => { + logger.error(`[BC] c:"${this.getKey()}" err:`, err); + if (this.handleConnectError(err)) { + if (retryCount < maxRetries) { + retryCount++; + setTimeout(() => doLoop(), exponentialBackoff(retryCount - 1)); + logger.info( + `[BC] c:"${this.getKey()}" retry attempt ${retryCount}/${maxRetries}`, + ); + } else { + logger.error( + `[BC] c:"${this.getKey()}" exceeded maximum retry attempts`, + ); + + // Stop retrying after maxRetries + throw err; + } + } else { + // Stop retrying if the error is not retryable + throw err; + } + }); + }); + }; + + return doLoop(); } /** diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index 311f91d3c..b0452c83e 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -79,6 +79,23 @@ import { History, HistoryOperation } from '@yorkie-js-sdk/src/document/history'; import { setupDevtools } from '@yorkie-js-sdk/src/devtools'; import * as Devtools from '@yorkie-js-sdk/src/devtools/types'; +/** + * `BroadcastOptions` are the options to create a new document. + * + * @public + */ +export interface BroadcastOptions { + /** + * `error` is called when an error occurs. + */ + error?: ErrorFn; + + /** + * `maxRetries` is the maximum number of retries. + */ + maxRetries?: number; +} + /** * `DocumentOptions` are the options to create a new document. * @@ -386,13 +403,13 @@ export interface PresenceChangedEvent

export interface BroadcastEvent extends BaseDocEvent { type: DocEventType.Broadcast; value: { clientID: ActorID; topic: string; payload: Json }; - error?: ErrorFn; + options?: BroadcastOptions; } export interface LocalBroadcastEvent extends BaseDocEvent { type: DocEventType.LocalBroadcast; value: { topic: string; payload: any }; - error?: ErrorFn; + options?: BroadcastOptions; } type DocEventCallbackMap

= { @@ -450,14 +467,14 @@ export type DocumentKey = string; type OperationInfoOfElement = TElement extends Text ? TextOperationInfo : TElement extends Counter - ? CounterOperationInfo - : TElement extends Tree - ? TreeOperationInfo - : TElement extends BaseArray - ? ArrayOperationInfo - : TElement extends BaseObject - ? ObjectOperationInfo - : OperationInfo; + ? CounterOperationInfo + : TElement extends Tree + ? TreeOperationInfo + : TElement extends BaseArray + ? ArrayOperationInfo + : TElement extends BaseObject + ? ObjectOperationInfo + : OperationInfo; /** * `OperationInfoOfInternal` represents the type of the operation info of the @@ -478,24 +495,24 @@ type OperationInfoOfInternal< > = TDepth extends 0 ? TElement : TKeyOrPath extends `${infer TFirst}.${infer TRest}` - ? TFirst extends keyof TElement - ? TElement[TFirst] extends BaseArray - ? OperationInfoOfInternal< - TElement[TFirst], - number, - DecreasedDepthOf - > - : OperationInfoOfInternal< - TElement[TFirst], - TRest, - DecreasedDepthOf - > - : OperationInfo - : TKeyOrPath extends keyof TElement - ? TElement[TKeyOrPath] extends BaseArray - ? ArrayOperationInfo - : OperationInfoOfElement - : OperationInfo; + ? TFirst extends keyof TElement + ? TElement[TFirst] extends BaseArray + ? OperationInfoOfInternal< + TElement[TFirst], + number, + DecreasedDepthOf + > + : OperationInfoOfInternal< + TElement[TFirst], + TRest, + DecreasedDepthOf + > + : OperationInfo + : TKeyOrPath extends keyof TElement + ? TElement[TKeyOrPath] extends BaseArray + ? ArrayOperationInfo + : OperationInfoOfElement + : OperationInfo; /** * `DecreasedDepthOf` represents the type of the decreased depth of the given depth. @@ -503,24 +520,24 @@ type OperationInfoOfInternal< type DecreasedDepthOf = Depth extends 10 ? 9 : Depth extends 9 - ? 8 - : Depth extends 8 - ? 7 - : Depth extends 7 - ? 6 - : Depth extends 6 - ? 5 - : Depth extends 5 - ? 4 - : Depth extends 4 - ? 3 - : Depth extends 3 - ? 2 - : Depth extends 2 - ? 1 - : Depth extends 1 - ? 0 - : -1; + ? 8 + : Depth extends 8 + ? 7 + : Depth extends 7 + ? 6 + : Depth extends 6 + ? 5 + : Depth extends 5 + ? 4 + : Depth extends 4 + ? 3 + : Depth extends 3 + ? 2 + : Depth extends 2 + ? 1 + : Depth extends 1 + ? 0 + : -1; /** * `PathOfInternal` represents the type of the path of the given element. @@ -532,29 +549,29 @@ type PathOfInternal< > = Depth extends 0 ? Prefix : TElement extends Record - ? { - [TKey in keyof TElement]: TElement[TKey] extends LeafElement - ? `${Prefix}${TKey & string}` - : TElement[TKey] extends BaseArray - ? - | `${Prefix}${TKey & string}` - | `${Prefix}${TKey & string}.${number}` - | PathOfInternal< - TArrayElement, - `${Prefix}${TKey & string}.${number}.`, - DecreasedDepthOf - > - : - | `${Prefix}${TKey & string}` - | PathOfInternal< - TElement[TKey], - `${Prefix}${TKey & string}.`, - DecreasedDepthOf - >; - }[keyof TElement] - : Prefix extends `${infer TRest}.` - ? TRest - : Prefix; + ? { + [TKey in keyof TElement]: TElement[TKey] extends LeafElement + ? `${Prefix}${TKey & string}` + : TElement[TKey] extends BaseArray + ? + | `${Prefix}${TKey & string}` + | `${Prefix}${TKey & string}.${number}` + | PathOfInternal< + TArrayElement, + `${Prefix}${TKey & string}.${number}.`, + DecreasedDepthOf + > + : + | `${Prefix}${TKey & string}` + | PathOfInternal< + TElement[TKey], + `${Prefix}${TKey & string}.`, + DecreasedDepthOf + >; + }[keyof TElement] + : Prefix extends `${infer TRest}.` + ? TRest + : Prefix; /** * `OperationInfoOf` represents the type of the operation info of the given @@ -2070,11 +2087,11 @@ export class Document { /** * `broadcast` the payload to the given topic. */ - public broadcast(topic: string, payload: Json, error?: ErrorFn) { + public broadcast(topic: string, payload: Json, options?: BroadcastOptions) { const broadcastEvent: LocalBroadcastEvent = { type: DocEventType.LocalBroadcast, value: { topic, payload }, - error, + options, }; this.publish([broadcastEvent]); diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index 2d485e386..cf325c647 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -33,6 +33,7 @@ import { } from '@yorkie-js-sdk/test/integration/integration_helper'; import { ConnectError, Code as ConnectCode } from '@connectrpc/connect'; import { Code, YorkieError } from '@yorkie-js-sdk/src/util/error'; +import { Json } from '@yorkie-js-sdk/src/document/document'; describe.sequential('Client', function () { afterEach(() => { @@ -902,7 +903,9 @@ describe.sequential('Client', function () { // @ts-ignore // Disable type checking for testing purposes - doc.broadcast(broadcastTopic, payload, errorHandler); + doc.broadcast(broadcastTopic, payload, { + error: errorHandler, + }); await eventCollector.waitAndVerifyNthEvent(1, broadcastErrMessage); @@ -914,7 +917,7 @@ describe.sequential('Client', function () { }) => { await withTwoClientsAndDocuments<{ t: Text }>( async (c1, d1, c2, d2) => { - const eventCollector = new EventCollector<[string, any]>(); + const eventCollector = new EventCollector<[string, Json]>(); const broadcastTopic = 'test'; const unsubscribe = d2.subscribe('broadcast', (event) => { const { topic, payload } = event.value; @@ -945,7 +948,7 @@ describe.sequential('Client', function () { }) => { await withTwoClientsAndDocuments<{ t: Text }>( async (c1, d1, c2, d2) => { - const eventCollector = new EventCollector<[string, any]>(); + const eventCollector = new EventCollector<[string, Json]>(); const broadcastTopic1 = 'test1'; const broadcastTopic2 = 'test2'; @@ -980,7 +983,7 @@ describe.sequential('Client', function () { }) => { await withTwoClientsAndDocuments<{ t: Text }>( async (c1, d1, c2, d2) => { - const eventCollector = new EventCollector<[string, any]>(); + const eventCollector = new EventCollector<[string, Json]>(); const broadcastTopic = 'test'; const unsubscribe = d2.subscribe('broadcast', (event) => { const { topic, payload } = event.value; @@ -1018,8 +1021,8 @@ describe.sequential('Client', function () { }) => { await withTwoClientsAndDocuments<{ t: Text }>( async (c1, d1, c2, d2) => { - const eventCollector1 = new EventCollector<[string, any]>(); - const eventCollector2 = new EventCollector<[string, any]>(); + const eventCollector1 = new EventCollector<[string, Json]>(); + const eventCollector2 = new EventCollector<[string, Json]>(); const broadcastTopic = 'test'; const payload = { a: 1, b: '2' }; @@ -1058,4 +1061,96 @@ describe.sequential('Client', function () { SyncMode.Realtime, ); }); + + it('Should retry broadcasting on network failure with retry option and succeeds when network is restored', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector = new EventCollector<[string, Json]>(); + const broadcastTopic = 'test'; + const unsubscribe = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic) { + eventCollector.add([topic, payload]); + } + }); + + // 01. Simulate Unknown error. + vi.stubGlobal('fetch', async () => { + throw new ConnectError('Failed to fetch', ConnectCode.Unknown); + }); + await new Promise((res) => setTimeout(res, 30)); + + const payload = { a: 1, b: '2' }; + + d1.broadcast(broadcastTopic, payload); + + // Failed to broadcast due to network failure + await new Promise((res) => setTimeout(res, 3000)); + assert.equal(eventCollector.getLength(), 0); + + // 02. Back to normal condition + vi.unstubAllGlobals(); + + await eventCollector.waitAndVerifyNthEvent(1, [ + broadcastTopic, + payload, + ]); + + unsubscribe(); + }, + task.name, + SyncMode.Realtime, + ); + }); + + it('Should not retry broadcasting on network failure when maxRetries is set to zero', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector = new EventCollector<[string, any]>(); + const eventCollector2 = new EventCollector(); + const broadcastTopic = 'test'; + const unsubscribe = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic) { + eventCollector.add([topic, payload]); + } + }); + + // 01. Simulate Unknown error. + vi.stubGlobal('fetch', async () => { + throw new ConnectError('Failed to fetch', ConnectCode.Unknown); + }); + + await new Promise((res) => setTimeout(res, 30)); + + const payload = { a: 1, b: '2' }; + + const errorHandler = (error: Error) => { + if (error instanceof ConnectError) { + eventCollector2.add(error.code); + } + }; + + d1.broadcast(broadcastTopic, payload, { + error: errorHandler, + maxRetries: 0, + }); + + // 02. Back to normal condition + vi.unstubAllGlobals(); + + await eventCollector2.waitAndVerifyNthEvent(1, ConnectCode.Unknown); + + unsubscribe(); + }, + task.name, + SyncMode.Realtime, + ); + }); });