Skip to content

Commit

Permalink
fix: Upgrade to AWS SDK v3 (#1201)
Browse files Browse the repository at this point in the history
AWS SDK v2 will enter maintenance mode in September and will become
unmaintained in 2025, as announced here:
https://aws.amazon.com/blogs/developer/announcing-end-of-support-for-aws-sdk-for-javascript-v2/

Documentation for AWS SDK v3 can be found at
https://docs.aws.amazon.com/sdk-for-javascript/v3/developer-guide
however it is often not particularly helpful unless you know exactly
what you're looking for. The upgrade guide here is helpful for filling
in some of the gaps:
https://github.com/aws/aws-sdk-js-v3/blob/main/UPGRADING.md

This upgrade should not introduce behavioural changes. It removes the
peer dependency on `aws-sdk`, and I've added the new AWS SDK packages as
direct dependencies instead. This will make the update process as simple
as possible (we're still using AWS SDK v2 in most of our services) and
adding new peer dependencies would create a breaking change.

Jira: [ENG-3211]
  • Loading branch information
seb-cr authored May 8, 2024
1 parent 79a5489 commit fcfe5d6
Show file tree
Hide file tree
Showing 8 changed files with 1,433 additions and 329 deletions.
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
"@types/xml2js": "^0.4.14",
"@typescript-eslint/eslint-plugin": "^5.33.0",
"@typescript-eslint/parser": "^5.33.0",
"aws-sdk": "^2.1568.0",
"eslint": "^8.57.0",
"eslint-plugin-import": "^2.25.2",
"eslint-plugin-jsdoc": "^39.3.2",
Expand All @@ -45,12 +44,13 @@
"tsconfig-paths": "^4.2.0",
"typescript": "^4.9.5"
},
"peerDependencies": {
"aws-sdk": "^2.831.0"
},
"dependencies": {
"@aws-sdk/client-lambda": "^3.540.0",
"@aws-sdk/client-s3": "^3.540.0",
"@aws-sdk/client-sqs": "^3.540.0",
"@lumigo/tracer": "^1.91.0",
"@sentry/node": "^6.19.7",
"@smithy/node-http-handler": "^2.5.0",
"@types/aws-lambda": "^8.10.134",
"alai": "1.0.3",
"async": "^3.2.5",
Expand Down
4 changes: 2 additions & 2 deletions src/models/SQSMessageModel.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { SQS } from 'aws-sdk';
import type { Message } from '@aws-sdk/client-sqs';

/**
* Model for message received from SQS.
Expand All @@ -24,7 +24,7 @@ export default class SQSMessageModel {

forDeletion = false;

constructor(message: SQS.Message) {
constructor(message: Message) {
if (!message.MessageId) {
throw new TypeError('Message does not have a MessageId');
}
Expand Down
21 changes: 13 additions & 8 deletions src/services/BaseConfigService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { S3 } from 'aws-sdk';
import {
DeleteObjectCommand,
GetObjectCommand,
PutObjectCommand,
S3Client,
} from '@aws-sdk/client-s3';

import DependencyAwareClass from '../core/DependencyAwareClass';
import LambdaTermination from '../utils/LambdaTermination';
Expand Down Expand Up @@ -64,7 +69,7 @@ export default class BaseConfigService extends DependencyAwareClass {
* Returns an S3 client.
*/
static get client() {
return new S3({
return new S3Client({
region: process.env.REGION,
});
}
Expand All @@ -80,9 +85,9 @@ export default class BaseConfigService extends DependencyAwareClass {
* Deletes the configuration stored on S3. Helpful in feature tests.
*/
async delete() {
return this.client.deleteObject(
return this.client.send(new DeleteObjectCommand(
(this.constructor as typeof BaseConfigService).s3config,
).promise();
));
}

/**
Expand All @@ -91,10 +96,10 @@ export default class BaseConfigService extends DependencyAwareClass {
* @param config
*/
async put<T>(config: T): Promise<T> {
await this.client.putObject({
await this.client.send(new PutObjectCommand({
...(this.constructor as typeof BaseConfigService).s3config,
Body: JSON.stringify(config),
}).promise();
}));

return config;
}
Expand All @@ -103,9 +108,9 @@ export default class BaseConfigService extends DependencyAwareClass {
* Gets the service configuration.
*/
async get(): Promise<unknown> {
const response = await this.client.getObject(
const response = await this.client.send(new GetObjectCommand(
(this.constructor as typeof BaseConfigService).s3config,
).promise();
));
const body = String(response.Body);

if (!body) {
Expand Down
170 changes: 86 additions & 84 deletions src/services/SQSService.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
import {
InvokeCommand,
LambdaClient,
} from '@aws-sdk/client-lambda';
import {
DeleteMessageBatchCommand,
GetQueueAttributesCommand,
ListQueuesCommand,
ReceiveMessageCommand,
SQSClient,
SendMessageCommand,
SendMessageCommandInput,
} from '@aws-sdk/client-sqs';
import { NodeHttpHandler } from '@smithy/node-http-handler';
import alai from 'alai';
import { each } from 'async';
import AWS from 'aws-sdk';
import { v4 as uuid } from 'uuid';

import DependencyAwareClass from '../core/DependencyAwareClass';
Expand Down Expand Up @@ -210,9 +223,9 @@ export default class SQSService<

readonly queueUrls: Record<QueueName<TConfig>, string>;

private $sqs?: AWS.SQS;
private $sqs?: SQSClient;

private $lambda?: AWS.Lambda;
private $lambda?: LambdaClient;

constructor(di: DependencyInjection<TConfig>) {
super(di);
Expand Down Expand Up @@ -252,14 +265,14 @@ export default class SQSService<
*/
get sqs() {
if (!this.$sqs) {
this.$sqs = new AWS.SQS({
this.$sqs = new SQSClient({
region: process.env.REGION,
httpOptions: {
requestHandler: new NodeHttpHandler({
// longest publish on NOTV took 5 seconds
connectTimeout: 8 * 1000,
timeout: 8 * 1000,
},
maxRetries: 3, // default is 3, we can change that
connectionTimeout: 8 * 1000,
socketTimeout: 8 * 1000,
}),
maxAttempts: 4, // default from AWS SDK v2 was 3 retries for SQS
});
}

Expand All @@ -278,7 +291,7 @@ export default class SQSService<
}

// move to subprocess
this.$lambda = new AWS.Lambda({
this.$lambda = new LambdaClient({
region: process.env.AWS_REGION,
endpoint,
});
Expand Down Expand Up @@ -331,21 +344,16 @@ export default class SQSService<
resolve();
}

this.sqs.deleteMessageBatch(
{
Entries: messagesForDeletion,
QueueUrl: queueUrl,
},
(error) => {
timer.stop(timerId);

if (error) {
logger.error(error);
}

resolve();
},
);
this.sqs.send(new DeleteMessageBatchCommand({
Entries: messagesForDeletion,
QueueUrl: queueUrl,
})).finally(() => {
timer.stop(timerId);
}).catch((error) => {
logger.error(error);
}).then(() => {
resolve();
});
},
);
});
Expand All @@ -362,25 +370,27 @@ export default class SQSService<
return new Promise((resolve) => {
timer.start(timerId);

this.sqs.listQueues({}, (error, data) => {
timer.stop(timerId);

let status: Status = 'OK';
let status: Status = 'OK';

if (error) {
this.sqs.send(new ListQueuesCommand())
.finally(() => {
timer.stop(timerId);
})
.then((data) => {
if (typeof data.QueueUrls === 'undefined' || data.QueueUrls.length === 0) {
status = 'APPLICATION_FAILURE';
}
})
.catch((error) => {
logger.error(error);
status = 'APPLICATION_FAILURE';
}

if (typeof data.QueueUrls === 'undefined' || data.QueueUrls.length === 0) {
status = 'APPLICATION_FAILURE';
}

resolve({
service: 'SQS',
status,
})
.then(() => {
resolve({
service: 'SQS',
status,
});
});
});
});
}

Expand All @@ -398,23 +408,18 @@ export default class SQSService<
return new Promise((resolve) => {
timer.start(timerId);

this.sqs.getQueueAttributes(
{
AttributeNames: ['ApproximateNumberOfMessages'],
QueueUrl: queueUrl,
},
(error, data) => {
timer.stop(timerId);

if (error) {
logger.error(error);
resolve(0);
}

const messageCount = data.Attributes?.ApproximateNumberOfMessages || '0';
resolve(Number.parseInt(messageCount, 10));
},
);
this.sqs.send(new GetQueueAttributesCommand({
AttributeNames: ['ApproximateNumberOfMessages'],
QueueUrl: queueUrl,
})).finally(() => {
timer.stop(timerId);
}).then((data) => {
const messageCount = data.Attributes?.ApproximateNumberOfMessages || '0';
resolve(Number.parseInt(messageCount, 10));
}).catch((error) => {
logger.error(error);
resolve(0);
});
});
}

Expand Down Expand Up @@ -444,7 +449,7 @@ export default class SQSService<

timer.start(timerId);

const messageParameters: AWS.SQS.SendMessageRequest = {
const messageParameters: SendMessageCommandInput = {
MessageBody: JSON.stringify(messageObject),
QueueUrl: queueUrl,
};
Expand All @@ -458,7 +463,7 @@ export default class SQSService<
if (this.di.isOffline && SQSService.offlineMode === SQS_OFFLINE_MODES.DIRECT) {
await this.publishOffline(queue, messageParameters);
} else {
await this.sqs.sendMessage(messageParameters).promise();
await this.sqs.send(new SendMessageCommand(messageParameters));
}
} catch (error) {
if (failureMode === SQS_PUBLISH_FAILURE_MODES.CATCH) {
Expand All @@ -481,7 +486,7 @@ export default class SQSService<
* @param queue
* @param messageParameters
*/
async publishOffline(queue: QueueName<TConfig>, messageParameters: AWS.SQS.SendMessageRequest) {
async publishOffline(queue: QueueName<TConfig>, messageParameters: SendMessageCommandInput) {
if (!this.di.isOffline) {
throw new Error('Can only publishOffline while running serverless offline.');
}
Expand All @@ -505,9 +510,11 @@ export default class SQSService<
],
});

const parameters = { FunctionName, InvocationType, Payload };

await this.lambda.invoke(parameters).promise();
await this.lambda.send(new InvokeCommand({
FunctionName,
InvocationType,
Payload,
}));
}

/**
Expand All @@ -525,27 +532,22 @@ export default class SQSService<
return new Promise((resolve, reject) => {
timer.start(timerId);

this.sqs.receiveMessage(
{
QueueUrl: queueUrl,
VisibilityTimeout: timeout,
MaxNumberOfMessages: 10,
},
(error, data) => {
timer.stop(timerId);

if (error) {
logger.error(error);
return reject(error);
}

if (typeof data.Messages === 'undefined') {
return resolve([]);
}

return resolve(data.Messages.map((message) => new SQSMessageModel(message)));
},
);
this.sqs.send(new ReceiveMessageCommand({
QueueUrl: queueUrl,
VisibilityTimeout: timeout,
MaxNumberOfMessages: 10,
})).finally(() => {
timer.stop(timerId);
}).then((data) => {
if (typeof data.Messages === 'undefined') {
resolve([]);
} else {
resolve(data.Messages.map((message) => new SQSMessageModel(message)));
}
}).catch((error) => {
logger.error(error);
reject(error);
});
});
}
}
Loading

0 comments on commit fcfe5d6

Please sign in to comment.