Skip to content

Commit

Permalink
Merge branch 'main' into amplifyOutputs-deepPartial
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwinkumar6 authored Jan 29, 2025
2 parents c63d1d6 + ba025e5 commit d0f6eb4
Show file tree
Hide file tree
Showing 17 changed files with 180 additions and 77 deletions.
28 changes: 0 additions & 28 deletions .github/integ-config/integ-all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -758,13 +758,6 @@ tests:
# amplifyjs_dir: true

# INTERACTIONS
# - test_name: integ_react_interactions_react_interactions
# desc: 'React Interactions'
# framework: react
# category: interactions
# sample_name: [chatbot-component]
# spec: chatbot-component
# browser: *minimal_browser_list
- test_name: integ_react_interactions_chatbot_v1
desc: 'Chatbot V1'
framework: react
Expand All @@ -779,27 +772,6 @@ tests:
sample_name: [lex-test-component]
spec: chatbot-v2
browser: *minimal_browser_list
# - test_name: integ_angular_interactions
# desc: 'Angular Interactions'
# framework: angular
# category: interactions
# sample_name: [chatbot-component]
# spec: chatbot-component
# browser: *minimal_browser_list
# - test_name: integ_vue_interactions_vue_2_interactions
# desc: 'Vue 2 Interactions'
# framework: vue
# category: interactions
# sample_name: [chatbot-component]
# spec: chatbot-component
# browser: [chrome]
# - test_name: integ_vue_interactionsvue_3_interactions
# desc: 'Vue 3 Interactions'
# framework: vue
# category: interactions
# sample_name: [chatbot-component-vue3]
# spec: chatbot-component
# browser: [chrome]

# PREDICTIONS
- test_name: integ_react_predictions
Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/callable-e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ env:
CYPRESS_GOOGLE_CLIENTID: ${{ secrets.CYPRESS_GOOGLE_CLIENTID }}
CYPRESS_GOOGLE_CLIENT_SECRET: ${{ secrets.CYPRESS_GOOGLE_CLIENT_SECRET }}
CYPRESS_GOOGLE_REFRESH_TOKEN: ${{ secrets.CYPRESS_GOOGLE_REFRESH_TOKEN }}
CYPRESS_AUTH0_CLIENTID: ${{ secrets.CYPRESS_AUTH0_CLIENTID }}
CYPRESS_AUTH0_SECRET: ${{ secrets.CYPRESS_AUTH0_SECRET }}
CYPRESS_AUTH0_AUDIENCE: ${{ secrets.CYPRESS_AUTH0_AUDIENCE }}
CYPRESS_AUTH0_DOMAIN: ${{ secrets.CYPRESS_AUTH0_DOMAIN }}

jobs:
e2e-test:
Expand Down
13 changes: 12 additions & 1 deletion eslint.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const compat = new FlatCompat({
const customClientDtsFiles = customClientDtsBundlerConfig.entries
.map(clientBundlerConfig => clientBundlerConfig.outFile)
.filter(outFile => outFile?.length > 0)
.map(outFile => outFile.replace(__dirname + path.sep, '')) // Convert absolute path to relative path
.map(outFile => outFile.replace(__dirname + path.sep, '')); // Convert absolute path to relative path

export default [
{
Expand Down Expand Up @@ -294,4 +294,15 @@ export default [
'jsdoc/no-undefined-types': 1,
},
},
{
ignores: [
'**/**.{native,android,ios}.**',
'**/__tests__/**',
'**/packages/adapter-nextjs/**',
'**/packages/react-native/example/**',
],
rules: {
'import/no-extraneous-dependencies': 'error',
},
},
];
72 changes: 62 additions & 10 deletions packages/api-graphql/__tests__/AWSAppSyncRealTimeProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ describe('AWSAppSyncRealTimeProvider', () => {
Object.defineProperty(constants, 'RECONNECT_DELAY', {
value: 100,
});
// Reduce the keep alive heartbeat to 10ms
Object.defineProperty(constants, 'DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT', {
value: 10,
});
});

afterEach(async () => {
Expand Down Expand Up @@ -765,7 +769,7 @@ describe('AWSAppSyncRealTimeProvider', () => {
// Resolve the message delivery actions
await replaceConstant(
'DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT',
5,
10,
async () => {
await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerOpen();
Expand All @@ -776,17 +780,17 @@ describe('AWSAppSyncRealTimeProvider', () => {
await fakeWebSocketInterface?.startAckMessage();

await fakeWebSocketInterface?.keepAlive();
},
);

await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.Connected,
]);
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.Connected,
]);

// Wait until the socket is automatically disconnected
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.ConnectionDisrupted,
]);
// Wait until the socket is automatically disconnected
await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.ConnectionDisrupted,
]);
},
);

expect(fakeWebSocketInterface?.observedConnectionStates).toContain(
CS.ConnectedPendingKeepAlive,
Expand All @@ -798,6 +802,54 @@ describe('AWSAppSyncRealTimeProvider', () => {
);
});

test('subscription observer ka is cleared if data is received', async () => {
expect.assertions(1);

const observer = provider.subscribe({
appSyncGraphqlEndpoint: 'ws://localhost:8080',
});

observer.subscribe({ error: () => {} });
// Resolve the message delivery actions
await replaceConstant(
'DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT',
5,
async () => {
await fakeWebSocketInterface?.readyForUse;
await fakeWebSocketInterface?.triggerOpen();
await fakeWebSocketInterface?.handShakeMessage({
connectionTimeoutMs: 100,
});

await fakeWebSocketInterface?.startAckMessage();

await fakeWebSocketInterface?.keepAlive();

await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.ConnectedPendingKeepAlive,
]);
},
);

// Send message
await fakeWebSocketInterface?.sendDataMessage({
type: MESSAGE_TYPES.DATA,
payload: { data: {} },
});

await fakeWebSocketInterface?.waitUntilConnectionStateIn([
CS.Connected,
]);

expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([
CS.Disconnected,
CS.Connecting,
CS.Connected,
CS.ConnectedPendingKeepAlive,
CS.Connected,
]);
});

test('subscription connection disruption triggers automatic reconnection', async () => {
expect.assertions(1);

Expand Down
82 changes: 55 additions & 27 deletions packages/api-graphql/src/Providers/AWSWebSocketProvider/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
CONNECTION_INIT_TIMEOUT,
CONNECTION_STATE_CHANGE,
DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT,
DEFAULT_KEEP_ALIVE_TIMEOUT,
DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT,
MAX_DELAY_MS,
MESSAGE_TYPES,
NON_RETRYABLE_CODES,
Expand Down Expand Up @@ -83,9 +83,8 @@ export abstract class AWSWebSocketProvider {

protected awsRealTimeSocket?: WebSocket;
private socketStatus: SOCKET_STATUS = SOCKET_STATUS.CLOSED;
private keepAliveTimeoutId?: ReturnType<typeof setTimeout>;
private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
private keepAliveAlertTimeoutId?: ReturnType<typeof setTimeout>;
private keepAliveTimestamp: number = Date.now();
private keepAliveHeartbeatIntervalId?: ReturnType<typeof setInterval>;
private promiseArray: { res(): void; rej(reason?: any): void }[] = [];
private connectionState: ConnectionState | undefined;
private readonly connectionStateMonitor = new ConnectionStateMonitor();
Expand Down Expand Up @@ -119,6 +118,7 @@ export abstract class AWSWebSocketProvider {
return new Promise<void>((resolve, reject) => {
if (this.awsRealTimeSocket) {
this.awsRealTimeSocket.onclose = (_: CloseEvent) => {
this._closeSocket();
this.subscriptionObserverMap = new Map();
this.awsRealTimeSocket = undefined;
resolve();
Expand Down Expand Up @@ -171,7 +171,7 @@ export abstract class AWSWebSocketProvider {
this.logger.debug(
`${CONTROL_MSG.REALTIME_SUBSCRIPTION_INIT_ERROR}: ${err}`,
);
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
this._closeSocket();
})
.finally(() => {
subscriptionStartInProgress = false;
Expand Down Expand Up @@ -435,7 +435,7 @@ export abstract class AWSWebSocketProvider {
this.logger.debug({ err });
const message = String(err.message ?? '');
// Resolving to give the state observer time to propogate the update
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
this._closeSocket();

// Capture the error only when the network didn't cause disruption
if (
Expand Down Expand Up @@ -544,20 +544,15 @@ export abstract class AWSWebSocketProvider {
setTimeout(this._closeSocketIfRequired.bind(this), 1000);
} else {
this.logger.debug('closing WebSocket...');
if (this.keepAliveTimeoutId) {
clearTimeout(this.keepAliveTimeoutId);
}
if (this.keepAliveAlertTimeoutId) {
clearTimeout(this.keepAliveAlertTimeoutId);
}

const tempSocket = this.awsRealTimeSocket;
// Cleaning callbacks to avoid race condition, socket still exists
tempSocket.onclose = null;
tempSocket.onerror = null;
tempSocket.close(1000);
this.awsRealTimeSocket = undefined;
this.socketStatus = SOCKET_STATUS.CLOSED;
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
this._closeSocket();
}
}

Expand All @@ -577,13 +572,40 @@ export abstract class AWSWebSocketProvider {
errorType: string;
};

private maintainKeepAlive() {
this.keepAliveTimestamp = Date.now();
}

private keepAliveHeartbeat(connectionTimeoutMs: number) {
const currentTime = Date.now();

// Check for missed KA message
if (
currentTime - this.keepAliveTimestamp >
DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT
) {
this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED);
} else {
this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE);
}

// Recognize we are disconnected if we haven't seen messages in the keep alive timeout period
if (currentTime - this.keepAliveTimestamp > connectionTimeoutMs) {
this._errorDisconnect(CONTROL_MSG.TIMEOUT_DISCONNECT);
}
}

private _handleIncomingSubscriptionMessage(message: MessageEvent) {
if (typeof message.data !== 'string') {
return;
}

const [isData, data] = this._handleSubscriptionData(message);
if (isData) return;
if (isData) {
this.maintainKeepAlive();

return;
}

const { type, id, payload } = data;

Expand Down Expand Up @@ -632,16 +654,7 @@ export abstract class AWSWebSocketProvider {
}

if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) {
if (this.keepAliveTimeoutId) clearTimeout(this.keepAliveTimeoutId);
if (this.keepAliveAlertTimeoutId)
clearTimeout(this.keepAliveAlertTimeoutId);
this.keepAliveTimeoutId = setTimeout(() => {
this._errorDisconnect(CONTROL_MSG.TIMEOUT_DISCONNECT);
}, this.keepAliveTimeout);
this.keepAliveAlertTimeoutId = setTimeout(() => {
this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED);
}, DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT);
this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE);
this.maintainKeepAlive();

return;
}
Expand Down Expand Up @@ -686,13 +699,21 @@ export abstract class AWSWebSocketProvider {
this.logger.debug(`Disconnect error: ${msg}`);

if (this.awsRealTimeSocket) {
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
this._closeSocket();
this.awsRealTimeSocket.close();
}

this.socketStatus = SOCKET_STATUS.CLOSED;
}

private _closeSocket() {
if (this.keepAliveHeartbeatIntervalId) {
clearInterval(this.keepAliveHeartbeatIntervalId);
this.keepAliveHeartbeatIntervalId = undefined;
}
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
}

private _timeoutStartSubscriptionAck(subscriptionId: string) {
const subscriptionObserver =
this.subscriptionObserverMap.get(subscriptionId);
Expand All @@ -708,7 +729,7 @@ export abstract class AWSWebSocketProvider {
subscriptionState: SUBSCRIPTION_STATUS.FAILED,
});

this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
this._closeSocket();
this.logger.debug(
'timeoutStartSubscription',
JSON.stringify({ query, variables }),
Expand Down Expand Up @@ -820,6 +841,7 @@ export abstract class AWSWebSocketProvider {
this.logger.debug(`WebSocket connection error`);
};
newSocket.onclose = () => {
this._closeSocket();
reject(new Error('Connection handshake error'));
};
newSocket.onopen = () => {
Expand Down Expand Up @@ -849,6 +871,7 @@ export abstract class AWSWebSocketProvider {

this.awsRealTimeSocket.onclose = event => {
this.logger.debug(`WebSocket closed ${event.reason}`);
this._closeSocket();
reject(new Error(JSON.stringify(event)));
};

Expand Down Expand Up @@ -912,7 +935,11 @@ export abstract class AWSWebSocketProvider {
return;
}

this.keepAliveTimeout = connectionTimeoutMs;
// Set up a keep alive heartbeat for this connection
this.keepAliveHeartbeatIntervalId = setInterval(() => {
this.keepAliveHeartbeat(connectionTimeoutMs);
}, DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT);

this.awsRealTimeSocket.onmessage =
this._handleIncomingSubscriptionMessage.bind(this);

Expand All @@ -923,6 +950,7 @@ export abstract class AWSWebSocketProvider {

this.awsRealTimeSocket.onclose = event => {
this.logger.debug(`WebSocket closed ${event.reason}`);
this._closeSocket();
this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED);
};
}
Expand Down
5 changes: 5 additions & 0 deletions packages/api-graphql/src/Providers/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ export const START_ACK_TIMEOUT = 15000;
*/
export const DEFAULT_KEEP_ALIVE_TIMEOUT = 5 * 60 * 1000;

/**
* Default Time in milleseconds between monitoring checks of keep alive status
*/
export const DEFAULT_KEEP_ALIVE_HEARTBEAT_TIMEOUT = 5 * 1000;

/**
* Default Time in milleseconds to alert for missed GQL_CONNECTION_KEEP_ALIVE message
*/
Expand Down
1 change: 1 addition & 0 deletions packages/api-rest/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
"devDependencies": {
"@aws-amplify/core": "6.9.2",
"@aws-amplify/react-native": "1.1.6",
"@aws-sdk/types": "3.387.0",
"typescript": "5.0.2"
},
"size-limit": [
Expand Down
Loading

0 comments on commit d0f6eb4

Please sign in to comment.