Skip to content

Commit

Permalink
feat(metrics): add topic partition distribution metric (#1782) (#1792)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Aug 14, 2024
1 parent 49af755 commit 2a95edd
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class ClusterModel {
protected final Map<Integer, BrokerUpdater> brokerMap = new HashMap<>();
protected final Map<Integer, Map<TopicPartition, TopicPartitionReplicaUpdater>> brokerReplicaMap = new HashMap<>();
protected final Map<Uuid, String> idToTopicNameMap = new HashMap<>();
// <topicName, <partitionId, brokerId>>
protected final Map<String, Map<Integer, Integer>> topicPartitionReplicaMap = new HashMap<>();

public ClusterModel() {
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common
import org.apache.kafka.server.common.DirectoryEventHandler
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.metrics.s3stream.{PartitionCountDistribution, S3StreamKafkaMetricsManager}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard}

Expand Down Expand Up @@ -336,6 +337,13 @@ class ReplicaManager(val config: KafkaConfig,
metricsGroup.newGauge(PartitionsWithLateTransactionsCountMetricName, () => lateTransactionsCount)
metricsGroup.newGauge(ProducerIdCountMetricName, () => producerIdCount)

S3StreamKafkaMetricsManager.setTopicPartitionCountMetricsSupplier(() => new PartitionCountDistribution(config.nodeId,
config.rack.getOrElse(""), partitionDistribution().asJava))

def partitionDistribution(): Map[String, Integer] = {
allPartitions.keys.groupBy(_.topic).map(kv => kv._1 -> kv._2.size)
}

private def reassigningPartitionsCount: Int = leaderPartitionsIterator.count(_.isReassigning)

private def lateTransactionsCount: Int = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package org.apache.kafka.server.metrics.s3stream;

import java.util.Map;

public class PartitionCountDistribution {
private final int brokerId;
private final String rack;
private final Map<String, Integer> topicPartitionCount;

public PartitionCountDistribution(int brokerId, String rack, Map<String, Integer> topicPartitionCount) {
this.brokerId = brokerId;
this.rack = rack;
this.topicPartitionCount = topicPartitionCount;
}

public int brokerId() {
return brokerId;
}

public String rack() {
return rack;
}

public Map<String, Integer> topicPartitionCount() {
return topicPartitionCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ public class S3StreamKafkaMetricsConstants {
public static final String FETCH_LIMITER_PERMIT_NUM = "fetch_limiter_permit_num";
public static final String FETCH_PENDING_TASK_NUM = "fetch_pending_task_num";
public static final String SLOW_BROKER_METRIC_NAME = "slow_broker_count";
public static final String TOPIC_PARTITION_COUNT_METRIC_NAME = "topic_partition_count";

public static final AttributeKey<String> LABEL_NODE_ID = AttributeKey.stringKey("node_id");
public static final AttributeKey<String> LABEL_TOPIC_NAME = AttributeKey.stringKey("topic");
public static final AttributeKey<String> LABEL_RACK_ID = AttributeKey.stringKey("rack");

public static final AttributeKey<String> LABEL_OBJECT_STATE = AttributeKey.stringKey("state");
public static final String S3_OBJECT_PREPARED_STATE = "prepared";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class S3StreamKafkaMetricsManager {
private static MetricsConfig metricsConfig = new MetricsConfig(MetricsLevel.INFO, Attributes.empty());
private static ObservableLongGauge slowBrokerMetrics = new NoopObservableLongGauge();
private static Supplier<Map<Integer, Boolean>> slowBrokerSupplier = Collections::emptyMap;
private static ObservableLongGauge topicPartitionCountMetrics = new NoopObservableLongGauge();
private static Supplier<PartitionCountDistribution> topicPartitionCountSupplier = () -> null;

private static ObservableLongGauge partitionStatusStatisticsMetrics = new NoopObservableLongGauge();
private static List<String> partitionStatusList = Collections.emptyList();
Expand Down Expand Up @@ -112,6 +114,25 @@ private static void initAutoBalancerMetrics(Meter meter, String prefix) {
}
}
});
topicPartitionCountMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.TOPIC_PARTITION_COUNT_METRIC_NAME)
.setDescription("The number of partitions for each topic on each broker")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && isActiveSupplier.get()) {
PartitionCountDistribution partitionCountDistribution = topicPartitionCountSupplier.get();
if (partitionCountDistribution == null) {
return;
}
for (Map.Entry<String, Integer> entry : partitionCountDistribution.topicPartitionCount().entrySet()) {
String topic = entry.getKey();
result.record(entry.getValue(), Attributes.builder()
.put(S3StreamKafkaMetricsConstants.LABEL_TOPIC_NAME, topic)
.put(S3StreamKafkaMetricsConstants.LABEL_RACK_ID, partitionCountDistribution.rack())
.put(S3StreamKafkaMetricsConstants.LABEL_NODE_ID, String.valueOf(partitionCountDistribution.brokerId()))
.build());
}
}
});
}

private static void initObjectMetrics(Meter meter, String prefix) {
Expand Down Expand Up @@ -234,4 +255,8 @@ public static void setPartitionStatusStatisticsSupplier(List<String> partitionSt
S3StreamKafkaMetricsManager.partitionStatusList = partitionStatusList;
S3StreamKafkaMetricsManager.partitionStatusStatisticsSupplier = partitionStatusStatisticsSupplier;
}

public static void setTopicPartitionCountMetricsSupplier(Supplier<PartitionCountDistribution> topicPartitionCountSupplier) {
S3StreamKafkaMetricsManager.topicPartitionCountSupplier = topicPartitionCountSupplier;
}
}

0 comments on commit 2a95edd

Please sign in to comment.