Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for cloudwatch alarms #17

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ No modules.
| [aws_cloudwatch_event_rule.msk_health_lambda_schedule](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_event_rule) | resource |
| [aws_cloudwatch_event_target.msk_health_lambda_target](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_event_target) | resource |
| [aws_cloudwatch_log_group.msk_health_lambda_log_groups](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource |
| [aws_cloudwatch_metric_alarm.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_metric_alarm) | resource |
| [aws_iam_policy.msk_health_lambda_role_policy](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [aws_iam_role.msk_health_lambda_role](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role) | resource |
| [aws_iam_role_policy_attachment.msk_health_permissions](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource |
Expand All @@ -50,11 +51,14 @@ No modules.

| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:--------:|
| <a name="input_cluster_arns"></a> [cluster\_arns](#input\_cluster\_arns) | List of MSK cluster ARNs. Default is empty list. | `list(string)` | `[]` | no |
| <a name="input_email"></a> [email](#input\_email) | List of e-mail addresses subscribing to the SNS topic. Default is empty list. | `list(string)` | `[]` | no |
| <a name="input_enable_cloudwatch_alarms"></a> [enable\_cloudwatch\_alarms](#input\_enable\_cloudwatch\_alarms) | Setup CloudWatch alarms for the MSK clusters state. For each state a separate alarm will be created. Default is false. | `bool` | `false` | no |
| <a name="input_enable_sns_notifications"></a> [enable\_sns\_notifications](#input\_enable\_sns\_notifications) | Setup SNS notifications for the MSK clusters state. Default is false. | `bool` | `false` | no |
| <a name="input_ignore_states"></a> [ignore\_states](#input\_ignore\_states) | Suppress warnings for the listed MSK states. Default: ['MAINTENANCE'] | `list(string)` | <pre>[<br> "MAINTENANCE"<br>]</pre> | no |
| <a name="input_log_retion_period_in_days"></a> [log\_retion\_period\_in\_days](#input\_log\_retion\_period\_in\_days) | Number of days logs will be retained. Default is 365 days. | `number` | `365` | no |
| <a name="input_memory_size"></a> [memory\_size](#input\_memory\_size) | Amount of memory in MByte that the Lambda Function can use at runtime. Default is 160. | `number` | `160` | no |
| <a name="input_schedule_expression"></a> [schedule\_expression](#input\_schedule\_expression) | The schedule expression for the CloudWatch event rule. Default is 'rate(15 minutes)'. | `string` | `"rate(15 minutes)"` | no |
| <a name="input_schedule_expression"></a> [schedule\_expression](#input\_schedule\_expression) | The schedule expression for the CloudWatch event rule. Default is 'rate(5 minutes)'. | `string` | `"rate(5 minutes)"` | no |
| <a name="input_tags"></a> [tags](#input\_tags) | A map of tags to add to all resources. Default is empty map. | `map(string)` | `{}` | no |

## Outputs
Expand Down
5 changes: 4 additions & 1 deletion examples/01_default_configuration/main.tf
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
module "msk_monitor" {
source = "../.."
source = "../.."
cluster_arns = []
enable_cloudwatch_alarms = true
schedule_expression = "rate(2 minutes)"
tags = {
"Name" = "msk-monitor"
}
Expand Down
81 changes: 63 additions & 18 deletions functions/check-msk-status/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@


def lambda_handler(event, context):
CLUSTER_ARNS = os.environ["CLUSTER_ARNS"].split(",")
ENABLE_CLOUDWATCH_METRICS = os.environ["ENABLE_CLOUDWATCH_METRICS"]
ENABLE_SNS_NOTIFICATIONS = os.environ["ENABLE_SNS_NOTIFICATIONS"]
LAMBDASNSTOPIC = os.environ["SNS_TOPIC_ARN"]
SUPPRESS_STATES = os.environ["SUPPRESS_STATES"].split(",")

region = "eu-central-1"
# Create an MSK client
client = boto3.client("kafka", region_name=region)

# Create boto clients
kafka = boto3.client("kafka", region_name=region)
cloudwatch = boto3.client("cloudwatch")
sns = boto3.client("sns")

# Retrieve a list of clusters
response = client.list_clusters_v2()
response = kafka.list_clusters_v2()
# Extract the cluster ARNs from the response
cluster_arns = response["ClusterInfoList"]

Expand All @@ -20,28 +28,65 @@ def lambda_handler(event, context):
)
)

for cluster in cluster_arns:
arn = cluster["ClusterArn"]
response = client.describe_cluster_v2(ClusterArn=arn)
for arn in CLUSTER_ARNS:
try:
response = kafka.describe_cluster_v2(ClusterArn=arn)
except Exception as e:
print(f"An error occurred when trying to describe the cluster {arn}: {e}")
continue

status = response["ClusterInfo"]["State"]
print("The cluster {} is in state {}.".format(arn,status))
sns_client = boto3.client("sns")
if status not in valid_states:
print("The MSK cluster: {} needs attention.".format(arn))
sns_client.publish(
TopicArn=LAMBDASNSTOPIC,
Message="MSK cluster: "
+ arn
+ " needs attention. The status is: "
+ status,
Subject="MSK Health Warning!",
cluster_name = response["ClusterInfo"]["ClusterName"]
arn_parts = arn.split(":")
account_id = arn_parts[4]
print(
"The cluster {} in account {} is in state {}.".format(
cluster_name, account_id, status
)
)

# Cover situation where cluster has been deleted.
if ENABLE_CLOUDWATCH_METRICS:
x = 1 if status not in valid_states else 0
put_custom_metric(cloudwatch=cloudwatch, cluster_name=cluster_name, value=x)
print(
"Put custom metric for cluster: {} with value: {}".format(
cluster_name, x
)
)
if ENABLE_SNS_NOTIFICATIONS:
if status not in valid_states:
print("The MSK cluster: {} needs attention.".format(arn))
sns.publish(
TopicArn=LAMBDASNSTOPIC,
Message="MSK cluster "
+ cluster_name
+ " needs attention. The status is "
+ status,
Subject="MSK Health Warning!",
)
else:
print(
"The MSK cluster: {} is in a healthy state, and is reachable and available for use.".format(
"The MSK cluster {} is in a healthy state, and is reachable and available for use.".format(
arn
)
)

# Return the status
return {"statusCode": 200, "body": "OK"}


def put_custom_metric(cloudwatch, cluster_name: str, value: int):
return cloudwatch.put_metric_data(
MetricData=[
{
"MetricName": "Status",
"Dimensions": [
{"Name": "ClusterName", "Value": cluster_name},
],
"Unit": "None",
"Value": value,
},
],
Namespace="Custom/Kafka",
)
39 changes: 35 additions & 4 deletions main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ resource "aws_iam_role_policy_attachment" "msk_health_permissions" {
aws_iam_role.msk_health_lambda_role]
}

### TODO: check describe ClusterV2 permissions
# iam policy for lambda role
resource "aws_iam_policy" "msk_health_lambda_role_policy" {
name = "msk-health-lambda-role-policy-${random_id.id.hex}"
path = "/"
Expand Down Expand Up @@ -80,6 +78,13 @@ resource "aws_iam_policy" "msk_health_lambda_role_policy" {
"Resource": "arn:aws:kafka:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:cluster/*",
"Effect": "Allow"
},
{
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": "*",
"Effect": "Allow"
},
{
"Action": [
"sns:Publish"
Expand Down Expand Up @@ -110,8 +115,11 @@ resource "aws_lambda_function" "msk_health_lambda" {
}
environment {
variables = {
SNS_TOPIC_ARN = aws_sns_topic.msk_health_sns_topic.arn
SUPPRESS_STATES = join(",", var.ignore_states)
CLUSTER_ARNS = join(",", var.cluster_arns)
SNS_TOPIC_ARN = aws_sns_topic.msk_health_sns_topic.arn
ENABLE_CLOUDWATCH_METRICS = var.enable_cloudwatch_alarms
ENABLE_SNS_NOTIFICATIONS = var.enable_sns_notifications
SUPPRESS_STATES = join(",", var.ignore_states)
}
}

Expand Down Expand Up @@ -146,3 +154,26 @@ resource "aws_cloudwatch_log_group" "msk_health_lambda_log_groups" {
retention_in_days = var.log_retion_period_in_days
tags = var.tags
}


resource "aws_cloudwatch_metric_alarm" "this" {
for_each = toset(local.cluster_names)
namespace = "Custom/Kafka"
period = 300
metric_name = "Status"
alarm_name = "msk_status_monitor-${each.key}-${random_id.id.hex}"
comparison_operator = "GreaterThanThreshold"
alarm_description = "This alarm triggers on MSK cluster status"
evaluation_periods = 2
statistic = "Average"
threshold = 0
insufficient_data_actions = []
dimensions = {
ClusterName = each.key
}
tags = var.tags
}

locals {
cluster_names = var.enable_cloudwatch_alarms ? sort([for arn in var.cluster_arns : element(split("/", arn), 1)]) : []
}
22 changes: 20 additions & 2 deletions variables.tf
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
variable "cluster_arns" {
description = "List of MSK cluster ARNs. Default is empty list."
type = list(string)
default = []
}

variable "email" {
description = "List of e-mail addresses subscribing to the SNS topic. Default is empty list."
type = list(string)
default = []
}

variable "enable_cloudwatch_alarms" {
description = "Setup CloudWatch alarms for the MSK clusters state. For each state a separate alarm will be created. Default is false."
type = bool
default = false
}

variable "enable_sns_notifications" {
description = "Setup SNS notifications for the MSK clusters state. Default is false."
type = bool
default = false
}

variable "ignore_states" {
description = "Suppress warnings for the listed MSK states. Default: ['MAINTENANCE']"
type = list(string)
Expand Down Expand Up @@ -35,9 +53,9 @@ variable "memory_size" {
}

variable "schedule_expression" {
description = "The schedule expression for the CloudWatch event rule. Default is 'rate(15 minutes)'."
description = "The schedule expression for the CloudWatch event rule. Default is 'rate(5 minutes)'."
type = string
default = "rate(15 minutes)"
default = "rate(5 minutes)"
}

variable "tags" {
Expand Down
Loading