forked from aws-cloudformation/cfn-lint
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRuleScheduleExpression.py
125 lines (101 loc) · 4.44 KB
/
RuleScheduleExpression.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
from cfnlint.rules import CloudFormationLintRule, RuleMatch
class RuleScheduleExpression(CloudFormationLintRule):
"""Validate AWS Events Schedule expression format"""
id = "E3027"
shortdesc = "Validate AWS Event ScheduleExpression format"
description = "Validate the formation of the AWS::Event ScheduleExpression"
source_url = "https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html"
tags = ["resources", "events"]
def initialize(self, cfn):
"""Initialize the rule"""
self.resource_property_types = ["AWS::Events::Rule"]
def check_rate(self, value, path):
"""Check Rate configuration"""
matches = []
# Extract the expression from rate(XXX)
rate_expression = value[value.find("(") + 1 : value.find(")")]
if not rate_expression:
return [RuleMatch(path, "Rate value of ScheduleExpression cannot be empty")]
# Rate format: rate(Value Unit)
items = rate_expression.split(" ")
if len(items) != 2:
message = "Rate expression must contain 2 elements (Value Unit), rate contains {} elements"
matches.append(RuleMatch(path, message.format(len(items))))
return [RuleMatch(path, message.format(len(items)))]
# Check the Value
if not items[0].isdigit():
message = "Rate Value ({}) should be of type Integer."
extra_args = {
"actual_type": type(items[0]).__name__,
"expected_type": int.__name__,
}
return [RuleMatch(path, message.format(items[0]), **extra_args)]
if float(items[0]) <= 0:
return [
RuleMatch(path, f"Rate Value {items[0]!r} should be greater than 0.")
]
if float(items[0]) <= 1:
valid_periods = ["minute", "hour", "day"]
elif float(items[0]) > 1:
valid_periods = ["minutes", "hours", "days"]
# Check the Unit
if items[1] not in valid_periods:
return [
RuleMatch(
path, f"Rate Unit {items[1]!r} should be one of {valid_periods!r}."
)
]
return []
def check_cron(self, value, path):
"""Check Cron configuration"""
matches = []
# Extract the expression from cron(XXX)
cron_expression = value[value.find("(") + 1 : value.find(")")]
if not cron_expression:
matches.append(
RuleMatch(path, "Cron value of ScheduleExpression cannot be empty")
)
else:
# Rate format: cron(Minutes Hours Day-of-month Month Day-of-week Year)
items = cron_expression.split(" ")
if len(items) != 6:
message = "Cron expression must contain 6 elements (Minutes Hours Day-of-month Month Day-of-week Year), cron contains {} elements"
matches.append(RuleMatch(path, message.format(len(items))))
return matches
_, _, day_of_month, _, day_of_week, _ = cron_expression.split(" ")
if day_of_month != "?" and day_of_week != "?":
matches.append(
RuleMatch(
path,
"Don't specify the Day-of-month and Day-of-week fields in the same cron expression",
)
)
return matches
def check_value(self, value, path):
"""Count ScheduledExpression value"""
matches = []
# Value is either "cron()" or "rate()"
if value.startswith("rate(") and value.endswith(")"):
matches.extend(self.check_rate(value, path))
elif value.startswith("cron(") and value.endswith(")"):
matches.extend(self.check_cron(value, path))
else:
message = "Invalid ScheduledExpression specified ({}). Value has to be either cron() or rate()"
matches.append(RuleMatch(path, message.format(value)))
return matches
def match_resource_properties(self, properties, _, path, cfn):
"""Check CloudFormation Properties"""
matches = []
matches.extend(
cfn.check_value(
obj=properties,
key="ScheduleExpression",
path=path[:],
check_value=self.check_value,
)
)
return matches