Skip to content

Commit

Permalink
Add column BootstrapBrokers(bootstrap_broker_string, bootstrap_broker…
Browse files Browse the repository at this point in the history
…_string_tls) in table aws_msk_cluster (#2390) (#2399)



Co-authored-by: KOH JUHO <[email protected]>
  • Loading branch information
ParthaI and insummersnow authored Feb 6, 2025
1 parent 59238a4 commit 355f316
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions aws/table_aws_msk_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func tableAwsMSKCluster(_ context.Context) *plugin.Table {
Func: getKafkaClusterOperation,
Tags: map[string]string{"service": "kafka", "action": "DescribeClusterOperation"},
},
{
Func: getKafkaClusterBootstrapBrokers,
Tags: map[string]string{"service": "kafka", "action": "GetBootstrapBrokers"},
},
},
GetMatrixItemFunc: SupportedRegionMatrix(kafkav1.EndpointsID),
Columns: awsRegionalColumns([]*plugin.Column{
Expand Down Expand Up @@ -94,6 +98,18 @@ func tableAwsMSKCluster(_ context.Context) *plugin.Table {
Hydrate: getKafkaClusterOperation,
Transform: transform.FromValue(),
},
{
Name: "bootstrap_broker_string",
Description: "A string containing one or more hostname:port pairs of Kafka brokers suitable for use with Apache Kafka clients.",
Type: proto.ColumnType_STRING,
Hydrate: getKafkaClusterBootstrapBrokers,
},
{
Name: "bootstrap_broker_string_tls",
Description: "A string containing one or more hostname:port pairs of Kafka brokers suitable for TLS authentication.",
Type: proto.ColumnType_STRING,
Hydrate: getKafkaClusterBootstrapBrokers,
},
{
Name: "provisioned",
Description: "Information about the provisioned cluster.",
Expand Down Expand Up @@ -306,3 +322,37 @@ func getKafkaClusterConfiguration(ctx context.Context, d *plugin.QueryData, h *p

return op, nil
}

func getKafkaClusterBootstrapBrokers(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error) {
logger := plugin.Logger(ctx)
cluster := h.Item.(types.Cluster)

clusterArn := aws.ToString(cluster.ClusterArn)
// Empty check
if clusterArn == "" {
return nil, nil
}

svc, err := KafkaClient(ctx, d)
if err != nil {
logger.Error("aws_msk_cluster.getKafkaClusterBootstrapBrokers", "service_creation_error", err)
return nil, err
}

// Unsupported region, return no data
if svc == nil {
return nil, nil
}

params := &kafka.GetBootstrapBrokersInput{
ClusterArn: &clusterArn,
}

op, err := svc.GetBootstrapBrokers(ctx, params)
if err != nil {
logger.Error("aws_msk_cluster.getKafkaClusterBootstrapBrokers", "api_error", err)
return nil, err
}

return op, nil
}

0 comments on commit 355f316

Please sign in to comment.