Skip to content

Commit

Permalink
refactor: add heartbeat params responseTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyib committed Sep 12, 2024
1 parent 1c77917 commit 9e32b81
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 65 deletions.
37 changes: 27 additions & 10 deletions packages/hooks/src/useWebSocket/__tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,36 +149,53 @@ describe('useWebSocket', () => {
jest.spyOn(global, 'clearInterval');

const wsServer = new WS(wsUrl);
renderHook(() => useWebSocket(wsUrl, { heartbeat: { interval: 100 } }));
renderHook(() =>
useWebSocket(wsUrl, {
heartbeat: {
interval: 100,
responseTimeout: 200,
},
}),
);

// Called on mount
expect(clearInterval).toHaveBeenCalledTimes(1);

await act(async () => {
await wsServer.connected;
await sleep(110);
return promise;
});
expect(wsServer.messages).toStrictEqual(['ping']);

await expect(wsServer).toReceiveMessage('ping');

await expect(wsServer).toReceiveMessage('ping');

expect(wsServer).toHaveReceivedMessages(['ping', 'ping']);
await act(async () => {
await sleep(110);
});
expect(wsServer.messages).toStrictEqual(['ping', 'ping']);

act(() => wsServer.close());
expect(clearInterval).toHaveBeenCalledTimes(1);
await act(async () => {
wsServer.close();
await wsServer.closed;
await sleep(110);
return promise;
});
expect(clearInterval).toHaveBeenCalledTimes(1);
expect(clearInterval).toHaveBeenCalledTimes(2);
});

// TODO: 更详细的测试心跳相关的所有参数
// TODO: 心跳逻辑 demo 完善、demo 测试

it('should ignore heartbeat response message', async () => {
const wsServer = new WS(wsUrl);
const hooks = renderHook(() =>
useWebSocket(wsUrl, { heartbeat: { interval: 100, returnMessage: 'pong' } }),
useWebSocket(wsUrl, { heartbeat: { interval: 100, responseMessage: 'pong' } }),
);

await act(async () => {
await wsServer.connected;
return promise;
});

await expect(wsServer).toReceiveMessage('ping');

act(() => {
Expand Down
61 changes: 61 additions & 0 deletions packages/hooks/src/useWebSocket/demo/demo2.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import React, { useRef, useMemo } from 'react';
import { useWebSocket } from 'ahooks';

enum ReadyState {
Connecting = 0,
Open = 1,
Closing = 2,
Closed = 3,
}

export default () => {
const messageHistory = useRef<any[]>([]);

const { readyState, sendMessage, latestMessage, disconnect, connect } = useWebSocket(
'wss://ws.postman-echo.com/raw',
{
heartbeat: {
interval: 1000 * 3,
},
},
);

messageHistory.current = useMemo(
() => messageHistory.current.concat(latestMessage),
[latestMessage],
);

return (
<div>
{/* send message */}
<button
onClick={() => sendMessage && sendMessage(`${Date.now()}`)}
disabled={readyState !== ReadyState.Open}
style={{ marginRight: 8 }}
>
✉️ send
</button>
{/* disconnect */}
<button
onClick={() => disconnect && disconnect()}
disabled={readyState !== ReadyState.Open}
style={{ marginRight: 8 }}
>
❌ disconnect
</button>
{/* connect */}
<button onClick={() => connect && connect()} disabled={readyState === ReadyState.Open}>
{readyState === ReadyState.Connecting ? 'connecting' : '📞 connect'}
</button>
<div style={{ marginTop: 8 }}>readyState: {readyState}</div>
<div style={{ marginTop: 8 }}>
<p>received message: </p>
{messageHistory.current.map((message, index) => (
<p key={index} style={{ wordWrap: 'break-word' }}>
{message?.data}
</p>
))}
</div>
</div>
);
};
10 changes: 5 additions & 5 deletions packages/hooks/src/useWebSocket/index.en-US.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ useWebSocket(socketUrl: string, options?: Options): Result;

### HeartbeatOptions

| 参数 | 说明 | 类型 | 默认值 |
| ------------- | -------------------------------------------------------------------- | -------- | ------ |
| message | Heartbeat message | `string` | `ping` |
| returnMessage | Heartbeat response message, `latestMessage` will ignore this message | `string` | - |
| interval | Heartbeat Interval(ms) | `number` | `6000` |
| 参数 | 说明 | 类型 | 默认值 |
| --------------- | -------------------------------------------------------------------- | -------- | ------ |
| message | Heartbeat message | `string` | `ping` |
| responseMessage | Heartbeat response message, `latestMessage` will ignore this message | `string` | - |
| interval | Heartbeat Interval(ms) | `number` | `6000` |

### Result

Expand Down
104 changes: 60 additions & 44 deletions packages/hooks/src/useWebSocket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ import { useEffect, useRef, useState } from 'react';
import useLatest from '../useLatest';
import useMemoizedFn from '../useMemoizedFn';
import useUnmount from '../useUnmount';
import isObject from 'lodash/isObject';
import isNil from 'lodash/isNil';

const DEFAULT_MESSAGE = {
PING: 'ping',
PONG: 'pong',
};

export enum ReadyState {
Connecting = 0,
Expand All @@ -10,10 +17,13 @@ export enum ReadyState {
Closed = 3,
}

export type HeartbeatMessage = Parameters<WebSocket['send']>[0];

export interface HeartbeatOptions {
message?: string;
returnMessage?: string;
message?: HeartbeatMessage;
responseMessage?: HeartbeatMessage;
interval?: number;
responseTimeout?: number;
}

export interface Options {
Expand Down Expand Up @@ -50,8 +60,12 @@ export default function useWebSocket(socketUrl: string, options: Options = {}):
heartbeat = false,
} = options;

const { message: heartbeatMessage = 'ping', interval: heartbeatInterval = 60 * 1000 } =
typeof heartbeat === 'object' ? heartbeat : {};
const {
message: heartbeatMessage = DEFAULT_MESSAGE.PING,
responseMessage = DEFAULT_MESSAGE.PONG,
interval = 1000,
responseTimeout = 1000,
} = isObject(heartbeat) ? heartbeat : {};

const onOpenRef = useLatest(onOpen);
const onCloseRef = useLatest(onClose);
Expand All @@ -62,6 +76,7 @@ export default function useWebSocket(socketUrl: string, options: Options = {}):
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout>>();
const websocketRef = useRef<WebSocket>();
const heartbeatTimerRef = useRef<ReturnType<typeof setInterval>>();
const heartbeatTimeoutRef = useRef<ReturnType<typeof setTimeout>>();

const [latestMessage, setLatestMessage] = useState<WebSocketEventMap['message']>();
const [readyState, setReadyState] = useState<ReadyState>(ReadyState.Closed);
Expand All @@ -71,10 +86,7 @@ export default function useWebSocket(socketUrl: string, options: Options = {}):
reconnectTimesRef.current < reconnectLimit &&
websocketRef.current?.readyState !== ReadyState.Open
) {
if (reconnectTimerRef.current) {
clearTimeout(reconnectTimerRef.current);
}

clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = setTimeout(() => {
// eslint-disable-next-line @typescript-eslint/no-use-before-define
connectWs();
Expand All @@ -83,15 +95,26 @@ export default function useWebSocket(socketUrl: string, options: Options = {}):
}
};

const connectWs = () => {
if (reconnectTimerRef.current) {
clearTimeout(reconnectTimerRef.current);
}
// Status code 1000 -> Normal Closure: https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent/code
const disconnect: WebSocket['close'] = (code = 1000, reason) => {
clearTimeout(reconnectTimerRef.current);
clearInterval(heartbeatTimerRef.current);
clearTimeout(heartbeatTimeoutRef.current);

reconnectTimesRef.current = reconnectLimit;
websocketRef.current?.close(code, reason);
websocketRef.current = undefined;
};

if (websocketRef.current) {
websocketRef.current.close();
const sendMessage: WebSocket['send'] = (message) => {
if (readyState === ReadyState.Open) {
websocketRef.current?.send(message);
} else {
throw new Error('WebSocket disconnected');
}
};

const connectWs = () => {
const ws = new WebSocket(socketUrl, protocols);
setReadyState(ReadyState.Connecting);

Expand All @@ -109,68 +132,61 @@ export default function useWebSocket(socketUrl: string, options: Options = {}):
}
onOpenRef.current?.(event, ws);
reconnectTimesRef.current = 0;
setReadyState(ws.readyState || ReadyState.Open);

if (heartbeat) {
heartbeatTimerRef.current = setInterval(() => {
ws.send(heartbeatMessage);
}, heartbeatInterval);
if (ws.readyState === ReadyState.Open) {
ws.send(heartbeatMessage);
}
if (!isNil(heartbeatTimeoutRef.current)) {
return;
}

heartbeatTimeoutRef.current = setTimeout(() => {
disconnect();
}, responseTimeout);
}, interval);
}
setReadyState(ws.readyState || ReadyState.Open);
};
ws.onmessage = (message: WebSocketEventMap['message']) => {
if (websocketRef.current !== ws) {
return;
}
if (heartbeat && typeof heartbeat !== 'boolean' && heartbeat.returnMessage === message.data) {
return;
if (heartbeat) {
clearTimeout(heartbeatTimeoutRef.current);

if (responseMessage === message.data) {
return;
}
}

onMessageRef.current?.(message, ws);
setLatestMessage(message);
};

ws.onclose = (event) => {
onCloseRef.current?.(event, ws);
// closed by server
if (websocketRef.current === ws) {
// ws 关闭后,如果设置了超时重试的参数,则等待重试间隔时间后重试
reconnect();
}
// closed by disconnect or closed by server
if (!websocketRef.current || websocketRef.current === ws) {
setReadyState(ws.readyState || ReadyState.Closed);
if (heartbeat) {
clearInterval(heartbeatTimerRef.current);
}
}
};

websocketRef.current = ws;
};

const sendMessage: WebSocket['send'] = (message) => {
if (readyState === ReadyState.Open) {
websocketRef.current?.send(message);
} else {
throw new Error('WebSocket disconnected');
}
};

const connect = () => {
disconnect();
reconnectTimesRef.current = 0;
connectWs();
};

const disconnect = () => {
if (reconnectTimerRef.current) {
clearTimeout(reconnectTimerRef.current);
}

if (heartbeatTimerRef.current) {
clearInterval(heartbeatTimerRef.current);
}

reconnectTimesRef.current = reconnectLimit;
websocketRef.current?.close();
websocketRef.current = undefined;
};

useEffect(() => {
if (!manual && socketUrl) {
connect();
Expand Down
16 changes: 10 additions & 6 deletions packages/hooks/src/useWebSocket/index.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ nav:

<code src="./demo/demo1.tsx" />

### 测试示例

<code src="./demo/demo2.tsx" />

## API

```typescript
Expand All @@ -25,7 +29,7 @@ enum ReadyState {

interface HeartbeatOptions{
message?: string;
returnMessage?: string;
responseMessage?: string;
interval?: number;
}

Expand Down Expand Up @@ -75,11 +79,11 @@ useWebSocket(socketUrl: string, options?: Options): Result;

### HeartbeatOptions

| 参数 | 说明 | 类型 | 默认值 |
| ------------- | ------------------------------------------ | -------- | ------ |
| message | 心跳消息 | `string` | `ping` |
| returnMessage | 心跳回复消息,`latestMessage` 会忽略该消息 | `string` | - |
| interval | 心跳时间间隔(ms) | `number` | `6000` |
| 参数 | 说明 | 类型 | 默认值 |
| --------------- | ------------------------------------------ | -------- | ------ |
| message | 心跳消息 | `string` | `ping` |
| responseMessage | 心跳回复消息,`latestMessage` 会忽略该消息 | `string` | `pong` |
| interval | 心跳时间间隔(ms) | `number` | `6000` |

### Result

Expand Down

0 comments on commit 9e32b81

Please sign in to comment.