Skip to content

Commit

Permalink
Add configurable retry mechanism to broadcast interface (#901)
Browse files Browse the repository at this point in the history
Implement a new BroadcastOptions interface to allow for automatic retries
on network failures during broadcasts. This enhancement improves resilience
against temporary network issues, ensuring more reliable message delivery.
The maxRetries option allows users to control retry behavior, with a default
of 0 (no retries). Only network errors trigger retries; other errors, such
as unserializable payloads, will not initiate retry attempts.
  • Loading branch information
gwbaik9717 authored Sep 30, 2024
1 parent 4d5416f commit 9bf42e4
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 103 deletions.
98 changes: 72 additions & 26 deletions packages/sdk/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import { OpSource } from '@yorkie-js-sdk/src/document/operation/operation';
import { createAuthInterceptor } from '@yorkie-js-sdk/src/client/auth_interceptor';
import { createMetricInterceptor } from '@yorkie-js-sdk/src/client/metric_interceptor';
import { validateSerializable } from '../util/validator';
import { Json } from '@yorkie-js-sdk/src/document/document';
import { Json, BroadcastOptions } from '@yorkie-js-sdk/src/document/document';

/**
* `SyncMode` defines synchronization modes for the PushPullChanges API.
Expand Down Expand Up @@ -161,6 +161,15 @@ const DefaultClientOptions = {
reconnectStreamDelay: 1000,
};

/**
* `DefaultBroadcastOptions` is the default options for broadcast.
*/
const DefaultBroadcastOptions = {
maxRetries: Infinity,
initialRetryInterval: 1000,
maxBackoff: 20000,
};

/**
* `Client` is a normal client that can communicate with the server.
* It has documents and sends changes of the documents in local
Expand Down Expand Up @@ -307,12 +316,13 @@ export class Client {
doc.update((_, p) => p.set(options.initialPresence || {}));
const unsubscribeBroacastEvent = doc.subscribe(
'local-broadcast',
(event) => {
async (event) => {
const { topic, payload } = event.value;
const errorFn = event.error;
const errorFn = event.options?.error;
const options = event.options;

try {
this.broadcast(doc.getKey(), topic, payload);
await this.broadcast(doc.getKey(), topic, payload, options);
} catch (error: unknown) {
if (error instanceof Error) {
errorFn?.(error);
Expand Down Expand Up @@ -609,6 +619,7 @@ export class Client {
docKey: DocumentKey,
topic: string,
payload: Json,
options?: BroadcastOptions,
): Promise<void> {
if (!this.isActive()) {
throw new YorkieError(
Expand All @@ -631,28 +642,63 @@ export class Client {
);
}

return this.enqueueTask(async () => {
return this.rpcClient
.broadcast(
{
clientId: this.id!,
documentId: attachment.docID,
topic,
payload: new TextEncoder().encode(JSON.stringify(payload)),
},
{ headers: { 'x-shard-key': `${this.apiKey}/${docKey}` } },
)
.then(() => {
logger.info(
`[BC] c:"${this.getKey()}" broadcasts d:"${docKey}" t:"${topic}"`,
);
})
.catch((err) => {
logger.error(`[BC] c:"${this.getKey()}" err :`, err);
this.handleConnectError(err);
throw err;
});
});
const maxRetries =
options?.maxRetries ?? DefaultBroadcastOptions.maxRetries;
const maxBackoff = DefaultBroadcastOptions.maxBackoff;

let retryCount = 0;

const exponentialBackoff = (retryCount: number) => {
const retryInterval = Math.min(
DefaultBroadcastOptions.initialRetryInterval * 2 ** retryCount,
maxBackoff,
);
return retryInterval;
};

const doLoop = async (): Promise<any> => {
return this.enqueueTask(async () => {
return this.rpcClient
.broadcast(
{
clientId: this.id!,
documentId: attachment.docID,
topic,
payload: new TextEncoder().encode(JSON.stringify(payload)),
},
{ headers: { 'x-shard-key': `${this.apiKey}/${docKey}` } },
)
.then(() => {
logger.info(
`[BC] c:"${this.getKey()}" broadcasts d:"${docKey}" t:"${topic}"`,
);
})
.catch((err) => {
logger.error(`[BC] c:"${this.getKey()}" err:`, err);
if (this.handleConnectError(err)) {
if (retryCount < maxRetries) {
retryCount++;
setTimeout(() => doLoop(), exponentialBackoff(retryCount - 1));
logger.info(
`[BC] c:"${this.getKey()}" retry attempt ${retryCount}/${maxRetries}`,
);
} else {
logger.error(
`[BC] c:"${this.getKey()}" exceeded maximum retry attempts`,
);

// Stop retrying after maxRetries
throw err;
}
} else {
// Stop retrying if the error is not retryable
throw err;
}
});
});
};

return doLoop();
}

/**
Expand Down
159 changes: 88 additions & 71 deletions packages/sdk/src/document/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ import { History, HistoryOperation } from '@yorkie-js-sdk/src/document/history';
import { setupDevtools } from '@yorkie-js-sdk/src/devtools';
import * as Devtools from '@yorkie-js-sdk/src/devtools/types';

/**
* `BroadcastOptions` are the options to create a new document.
*
* @public
*/
export interface BroadcastOptions {
/**
* `error` is called when an error occurs.
*/
error?: ErrorFn;

/**
* `maxRetries` is the maximum number of retries.
*/
maxRetries?: number;
}

/**
* `DocumentOptions` are the options to create a new document.
*
Expand Down Expand Up @@ -386,13 +403,13 @@ export interface PresenceChangedEvent<P extends Indexable>
export interface BroadcastEvent extends BaseDocEvent {
type: DocEventType.Broadcast;
value: { clientID: ActorID; topic: string; payload: Json };
error?: ErrorFn;
options?: BroadcastOptions;
}

export interface LocalBroadcastEvent extends BaseDocEvent {
type: DocEventType.LocalBroadcast;
value: { topic: string; payload: any };
error?: ErrorFn;
options?: BroadcastOptions;
}

type DocEventCallbackMap<P extends Indexable> = {
Expand Down Expand Up @@ -450,14 +467,14 @@ export type DocumentKey = string;
type OperationInfoOfElement<TElement> = TElement extends Text
? TextOperationInfo
: TElement extends Counter
? CounterOperationInfo
: TElement extends Tree
? TreeOperationInfo
: TElement extends BaseArray<any>
? ArrayOperationInfo
: TElement extends BaseObject<any>
? ObjectOperationInfo
: OperationInfo;
? CounterOperationInfo
: TElement extends Tree
? TreeOperationInfo
: TElement extends BaseArray<any>
? ArrayOperationInfo
: TElement extends BaseObject<any>
? ObjectOperationInfo
: OperationInfo;

/**
* `OperationInfoOfInternal` represents the type of the operation info of the
Expand All @@ -478,49 +495,49 @@ type OperationInfoOfInternal<
> = TDepth extends 0
? TElement
: TKeyOrPath extends `${infer TFirst}.${infer TRest}`
? TFirst extends keyof TElement
? TElement[TFirst] extends BaseArray<unknown>
? OperationInfoOfInternal<
TElement[TFirst],
number,
DecreasedDepthOf<TDepth>
>
: OperationInfoOfInternal<
TElement[TFirst],
TRest,
DecreasedDepthOf<TDepth>
>
: OperationInfo
: TKeyOrPath extends keyof TElement
? TElement[TKeyOrPath] extends BaseArray<unknown>
? ArrayOperationInfo
: OperationInfoOfElement<TElement[TKeyOrPath]>
: OperationInfo;
? TFirst extends keyof TElement
? TElement[TFirst] extends BaseArray<unknown>
? OperationInfoOfInternal<
TElement[TFirst],
number,
DecreasedDepthOf<TDepth>
>
: OperationInfoOfInternal<
TElement[TFirst],
TRest,
DecreasedDepthOf<TDepth>
>
: OperationInfo
: TKeyOrPath extends keyof TElement
? TElement[TKeyOrPath] extends BaseArray<unknown>
? ArrayOperationInfo
: OperationInfoOfElement<TElement[TKeyOrPath]>
: OperationInfo;

/**
* `DecreasedDepthOf` represents the type of the decreased depth of the given depth.
*/
type DecreasedDepthOf<Depth extends number = 0> = Depth extends 10
? 9
: Depth extends 9
? 8
: Depth extends 8
? 7
: Depth extends 7
? 6
: Depth extends 6
? 5
: Depth extends 5
? 4
: Depth extends 4
? 3
: Depth extends 3
? 2
: Depth extends 2
? 1
: Depth extends 1
? 0
: -1;
? 8
: Depth extends 8
? 7
: Depth extends 7
? 6
: Depth extends 6
? 5
: Depth extends 5
? 4
: Depth extends 4
? 3
: Depth extends 3
? 2
: Depth extends 2
? 1
: Depth extends 1
? 0
: -1;

/**
* `PathOfInternal` represents the type of the path of the given element.
Expand All @@ -532,29 +549,29 @@ type PathOfInternal<
> = Depth extends 0
? Prefix
: TElement extends Record<string, any>
? {
[TKey in keyof TElement]: TElement[TKey] extends LeafElement
? `${Prefix}${TKey & string}`
: TElement[TKey] extends BaseArray<infer TArrayElement>
?
| `${Prefix}${TKey & string}`
| `${Prefix}${TKey & string}.${number}`
| PathOfInternal<
TArrayElement,
`${Prefix}${TKey & string}.${number}.`,
DecreasedDepthOf<Depth>
>
:
| `${Prefix}${TKey & string}`
| PathOfInternal<
TElement[TKey],
`${Prefix}${TKey & string}.`,
DecreasedDepthOf<Depth>
>;
}[keyof TElement]
: Prefix extends `${infer TRest}.`
? TRest
: Prefix;
? {
[TKey in keyof TElement]: TElement[TKey] extends LeafElement
? `${Prefix}${TKey & string}`
: TElement[TKey] extends BaseArray<infer TArrayElement>
?
| `${Prefix}${TKey & string}`
| `${Prefix}${TKey & string}.${number}`
| PathOfInternal<
TArrayElement,
`${Prefix}${TKey & string}.${number}.`,
DecreasedDepthOf<Depth>
>
:
| `${Prefix}${TKey & string}`
| PathOfInternal<
TElement[TKey],
`${Prefix}${TKey & string}.`,
DecreasedDepthOf<Depth>
>;
}[keyof TElement]
: Prefix extends `${infer TRest}.`
? TRest
: Prefix;

/**
* `OperationInfoOf` represents the type of the operation info of the given
Expand Down Expand Up @@ -2070,11 +2087,11 @@ export class Document<T, P extends Indexable = Indexable> {
/**
* `broadcast` the payload to the given topic.
*/
public broadcast(topic: string, payload: Json, error?: ErrorFn) {
public broadcast(topic: string, payload: Json, options?: BroadcastOptions) {
const broadcastEvent: LocalBroadcastEvent = {
type: DocEventType.LocalBroadcast,
value: { topic, payload },
error,
options,
};

this.publish([broadcastEvent]);
Expand Down
Loading

0 comments on commit 9bf42e4

Please sign in to comment.