Skip to content
This repository has been archived by the owner on Jan 2, 2025. It is now read-only.

add basic support for QueueConfigurations #598

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 159 additions & 10 deletions serverless-s3-local/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,29 @@ class ServerlessS3Local {
});
}

getSQSClient() {
if (this._sqsClient) return this._sqsClient;

// dependency of serverless-offline-sqs
const SQSClient = require("aws-sdk/clients/sqs");
const customConfig =
this.service.custom && this.service.custom["serverless-offline-sqs"];
const config = {
accessKeyId: defaultOptions.accessKeyId,
secretAccessKey: defaultOptions.secretAccessKey,
...customConfig,
};
this._sqsClient = new SQSClient({
region: this.service.provider.region,
credentials: {
accessKeyId: config.accessKeyId,
secretAccessKey: config.secretAccessKey,
},
...config,
});
return this._sqsClient;
}

getServiceRuntime() {
// Following codes are derived from serverless/index.js
let serviceRuntime = this.service.provider.runtime;
Expand Down Expand Up @@ -407,6 +430,12 @@ class ServerlessS3Local {
}

getEventHandlers() {
const functionHandlers = this.getFunctionHandlers();
const notificationHandlers = this.getNotificationConfigurationHandlers();
return [].concat(functionHandlers, notificationHandlers);
}

getFunctionHandlers() {
if (
typeof this.service !== "object" ||
typeof this.service.functions !== "object"
Expand Down Expand Up @@ -471,12 +500,11 @@ class ServerlessS3Local {
const pattern = existingEvent.replace(/^s3:/, "").replace("*", ".*");
eventHandlers.push(
ServerlessS3Local.buildEventHandler(
s3,
name,
pattern,
s3Rules,
func
)
)
);
});
this.serverless.cli.log(`Found S3 event listener for ${name}`);
Expand All @@ -486,7 +514,101 @@ class ServerlessS3Local {
return eventHandlers;
}

static buildEventHandler(s3, name, pattern, s3Rules, func) {
getNotificationConfigurationHandlers() {
if (
typeof this.service !== "object" ||
typeof this.service.functions !== "object" ||
!this.hasSQSOfflinePlugin()
) {
return [];
}

const eventHandlers = [];

Object.values(this.service.resources.Resources).forEach((resource) => {
if (resource.Type !== "AWS::S3::Bucket") return;
if (
!resource.Properties ||
!resource.Properties.NotificationConfiguration
)
return;
// TODO support other: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-s3-bucket-notificationconfig.html
const queueConfigurations =
resource.Properties.NotificationConfiguration.QueueConfigurations;
if (queueConfigurations) {
queueConfigurations.forEach((queueConfiguration) => {
const { Event, Queue, Filter } = queueConfiguration;
const queueName = this.getQueueNameForQueue(Queue);
const pattern = Event.replace(/^s3:/, "").replace("*", ".*");
let rules = [];
if (Filter) {
rules = Filter.S3Key.Rules.map((rule) => ({
[rule.Name]: rule.Value,
}));
}
const func = async (s3Event) => {
const baseEnvironment = {
IS_LOCAL: true,
IS_OFFLINE: true,
};

try {
Object.assign(
process.env,
baseEnvironment,
this.service.provider.environment
);
const sqs = this.getSQSClient();
const queueUrl = await new Promise((res, rej) =>
sqs
.getQueueUrl(
{
QueueName: queueName,
},
(err, data) => {
if (err) return rej(err);
return res(data.QueueUrl);
}
)
.send()
);
await new Promise((res, rej) =>
sqs
.sendMessage(
{
QueueUrl: queueUrl,
MessageBody: JSON.stringify(s3Event),
},
(err, data) => {
if (err) return rej(err);
return res(data);
}
)
.send()
);
} catch (e) {
this.serverless.cli.log("Error while running handler", e);
}
};
eventHandlers.push(
ServerlessS3Local.buildEventHandler(
resource.Properties.BucketName,
pattern,
rules,
func
)
);
});
this.serverless.cli.log(
`Found S3 event listener for ${resource.Properties.BucketName}`
);
}
});

return eventHandlers;
}

static buildEventHandler(name, pattern, s3Rules, func) {
const rule2regex = (rule) =>
Object.keys(rule).map(
(key) =>
Expand All @@ -505,12 +627,33 @@ class ServerlessS3Local {
}

getResourceForBucket(bucketName) {
const logicalResourceName = `S3Bucket${bucketName
.charAt(0)
.toUpperCase()}${bucketName.substr(1)}`;
return this.service.resources && this.service.resources.Resources
? this.service.resources.Resources[logicalResourceName]
: false;
if (!this.service.resources || !this.service.resources.Resources)
return false;
const bucketResource = Object.values(this.service.resources.Resources).find(
(resource) =>
resource.Type === "AWS::S3::Bucket" &&
resource.Properties &&
resource.Properties.BucketName === bucketName
);

return bucketResource ? bucketResource : false;
}

getQueueNameForQueue(Queue) {
if (!Queue) return null
if(typeof Queue === 'string' && Queue.startsWith('arn:')) {
return Queue.replace(/(?:[^:]+:)+/, "");
}
if (Queue['Fn::GetAtt']) {
const resourceName = Queue['Fn::GetAtt'][0];
if (!this.service.resources || !this.service.resources.Resources) return null;
const resource = this.service.resources.Resources[resourceName];
if (!resource) return null;
return resource.Properties.QueueName;
}
if (Queue.Type === 'AWS::SQS::QUEUE') {
return Queue.Properties.QueueName;
}
}

getAdditionalStacks() {
Expand Down Expand Up @@ -542,6 +685,10 @@ class ServerlessS3Local {
return this.hasPlugin("existing-s3");
}

hasSQSOfflinePlugin() {
return this.hasPlugin("serverless-offline-sqs");
}

/**
* Get bucket list from serverless.yml resources and additional stacks
*
Expand Down Expand Up @@ -607,7 +754,7 @@ class ServerlessS3Local {
[]
);

return Object.keys(resources)
const bucketsListMerged = Object.keys(resources)
.map((key) => {
if (
resources[key].Type === "AWS::S3::Bucket" &&
Expand All @@ -621,6 +768,8 @@ class ServerlessS3Local {
.concat(this.options.buckets)
.concat(eventSourceBuckets)
.filter((n) => n);

return [...new Set(bucketsListMerged)];
}

setOptions() {
Expand Down