From 2a95eddf1e44bcc1ea240be528282d26a7fc386b Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Wed, 14 Aug 2024 16:37:09 +0800 Subject: [PATCH] feat(metrics): add topic partition distribution metric (#1782) (#1792) Signed-off-by: Shichao Nie --- .../autobalancer/model/ClusterModel.java | 1 + .../scala/kafka/server/ReplicaManager.scala | 8 ++++ .../s3stream/PartitionCountDistribution.java | 38 +++++++++++++++++++ .../S3StreamKafkaMetricsConstants.java | 3 ++ .../s3stream/S3StreamKafkaMetricsManager.java | 25 ++++++++++++ 5 files changed, 75 insertions(+) create mode 100644 server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/PartitionCountDistribution.java diff --git a/core/src/main/java/kafka/autobalancer/model/ClusterModel.java b/core/src/main/java/kafka/autobalancer/model/ClusterModel.java index f9440c82ca..768276dd44 100644 --- a/core/src/main/java/kafka/autobalancer/model/ClusterModel.java +++ b/core/src/main/java/kafka/autobalancer/model/ClusterModel.java @@ -42,6 +42,7 @@ public class ClusterModel { protected final Map brokerMap = new HashMap<>(); protected final Map> brokerReplicaMap = new HashMap<>(); protected final Map idToTopicNameMap = new HashMap<>(); + // > protected final Map> topicPartitionReplicaMap = new HashMap<>(); public ClusterModel() { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 2cdf87e3fa..169021ef77 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -57,6 +57,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.server.common import org.apache.kafka.server.common.DirectoryEventHandler import org.apache.kafka.server.metrics.KafkaMetricsGroup +import org.apache.kafka.server.metrics.s3stream.{PartitionCountDistribution, S3StreamKafkaMetricsManager} import org.apache.kafka.server.util.{Scheduler, ShutdownableThread} import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard} @@ -336,6 +337,13 @@ class ReplicaManager(val config: KafkaConfig, metricsGroup.newGauge(PartitionsWithLateTransactionsCountMetricName, () => lateTransactionsCount) metricsGroup.newGauge(ProducerIdCountMetricName, () => producerIdCount) + S3StreamKafkaMetricsManager.setTopicPartitionCountMetricsSupplier(() => new PartitionCountDistribution(config.nodeId, + config.rack.getOrElse(""), partitionDistribution().asJava)) + + def partitionDistribution(): Map[String, Integer] = { + allPartitions.keys.groupBy(_.topic).map(kv => kv._1 -> kv._2.size) + } + private def reassigningPartitionsCount: Int = leaderPartitionsIterator.count(_.isReassigning) private def lateTransactionsCount: Int = { diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/PartitionCountDistribution.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/PartitionCountDistribution.java new file mode 100644 index 0000000000..c3b6e66228 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/PartitionCountDistribution.java @@ -0,0 +1,38 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package org.apache.kafka.server.metrics.s3stream; + +import java.util.Map; + +public class PartitionCountDistribution { + private final int brokerId; + private final String rack; + private final Map topicPartitionCount; + + public PartitionCountDistribution(int brokerId, String rack, Map topicPartitionCount) { + this.brokerId = brokerId; + this.rack = rack; + this.topicPartitionCount = topicPartitionCount; + } + + public int brokerId() { + return brokerId; + } + + public String rack() { + return rack; + } + + public Map topicPartitionCount() { + return topicPartitionCount; + } +} diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java index 3a59be1b1c..86ac2543b9 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java @@ -22,8 +22,11 @@ public class S3StreamKafkaMetricsConstants { public static final String FETCH_LIMITER_PERMIT_NUM = "fetch_limiter_permit_num"; public static final String FETCH_PENDING_TASK_NUM = "fetch_pending_task_num"; public static final String SLOW_BROKER_METRIC_NAME = "slow_broker_count"; + public static final String TOPIC_PARTITION_COUNT_METRIC_NAME = "topic_partition_count"; public static final AttributeKey LABEL_NODE_ID = AttributeKey.stringKey("node_id"); + public static final AttributeKey LABEL_TOPIC_NAME = AttributeKey.stringKey("topic"); + public static final AttributeKey LABEL_RACK_ID = AttributeKey.stringKey("rack"); public static final AttributeKey LABEL_OBJECT_STATE = AttributeKey.stringKey("state"); public static final String S3_OBJECT_PREPARED_STATE = "prepared"; diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java index 6436f4ceea..b1ccd2e375 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java @@ -65,6 +65,8 @@ public class S3StreamKafkaMetricsManager { private static MetricsConfig metricsConfig = new MetricsConfig(MetricsLevel.INFO, Attributes.empty()); private static ObservableLongGauge slowBrokerMetrics = new NoopObservableLongGauge(); private static Supplier> slowBrokerSupplier = Collections::emptyMap; + private static ObservableLongGauge topicPartitionCountMetrics = new NoopObservableLongGauge(); + private static Supplier topicPartitionCountSupplier = () -> null; private static ObservableLongGauge partitionStatusStatisticsMetrics = new NoopObservableLongGauge(); private static List partitionStatusList = Collections.emptyList(); @@ -112,6 +114,25 @@ private static void initAutoBalancerMetrics(Meter meter, String prefix) { } } }); + topicPartitionCountMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.TOPIC_PARTITION_COUNT_METRIC_NAME) + .setDescription("The number of partitions for each topic on each broker") + .ofLongs() + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && isActiveSupplier.get()) { + PartitionCountDistribution partitionCountDistribution = topicPartitionCountSupplier.get(); + if (partitionCountDistribution == null) { + return; + } + for (Map.Entry entry : partitionCountDistribution.topicPartitionCount().entrySet()) { + String topic = entry.getKey(); + result.record(entry.getValue(), Attributes.builder() + .put(S3StreamKafkaMetricsConstants.LABEL_TOPIC_NAME, topic) + .put(S3StreamKafkaMetricsConstants.LABEL_RACK_ID, partitionCountDistribution.rack()) + .put(S3StreamKafkaMetricsConstants.LABEL_NODE_ID, String.valueOf(partitionCountDistribution.brokerId())) + .build()); + } + } + }); } private static void initObjectMetrics(Meter meter, String prefix) { @@ -234,4 +255,8 @@ public static void setPartitionStatusStatisticsSupplier(List partitionSt S3StreamKafkaMetricsManager.partitionStatusList = partitionStatusList; S3StreamKafkaMetricsManager.partitionStatusStatisticsSupplier = partitionStatusStatisticsSupplier; } + + public static void setTopicPartitionCountMetricsSupplier(Supplier topicPartitionCountSupplier) { + S3StreamKafkaMetricsManager.topicPartitionCountSupplier = topicPartitionCountSupplier; + } }