diff --git a/documentation/compose/kafka-ui-arm64.yaml b/documentation/compose/kafka-ui-arm64.yaml
index 082d7cb5af0..91f8dc252ce 100644
--- a/documentation/compose/kafka-ui-arm64.yaml
+++ b/documentation/compose/kafka-ui-arm64.yaml
@@ -13,16 +13,29 @@ services:
- schema-registry0
- kafka-connect0
environment:
+ DYNAMIC_CONFIG_ENABLED: 'true' # not necessary, added for tests
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
+ KAFKA_CLUSTERS_0_METRICS_STORE_PROMETHEUS_URL: "http://prometheus:9090"
+ KAFKA_CLUSTERS_0_METRICS_STORE_PROMETHEUS_REMOTEWRITE: 'true'
+ KAFKA_CLUSTERS_0_METRICS_STORE_KAFKA_TOPIC: "kafka_metrics"
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8085
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
- DYNAMIC_CONFIG_ENABLED: 'true' # not necessary, added for tests
KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true'
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'
+ prometheus:
+ image: prom/prometheus:latest
+ hostname: prometheus
+ container_name: prometheus
+ ports:
+ - 9090:9090
+ volumes:
+ - ./scripts:/etc/prometheus
+ command: --web.enable-remote-write-receiver --config.file=/etc/prometheus/prometheus.yaml
+
kafka0:
image: confluentinc/cp-kafka:7.2.1.arm64
hostname: kafka0
diff --git a/documentation/compose/scripts/prometheus.yaml b/documentation/compose/scripts/prometheus.yaml
new file mode 100644
index 00000000000..457de126cd5
--- /dev/null
+++ b/documentation/compose/scripts/prometheus.yaml
@@ -0,0 +1,14 @@
+global:
+ scrape_interval: 30s
+ scrape_timeout: 10s
+
+rule_files:
+ - alert.yml
+
+scrape_configs:
+ - job_name: services
+ metrics_path: /metrics
+ static_configs:
+ - targets:
+ - 'prometheus:9090'
+# - 'kafka-ui:8080'
diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml
index 558a446e948..a964a5ebe74 100644
--- a/kafka-ui-api/pom.xml
+++ b/kafka-ui-api/pom.xml
@@ -239,6 +239,23 @@
spring-security-ldap
+
+ io.prometheus
+ simpleclient
+
+
+ io.prometheus
+ simpleclient_common
+
+
+ io.prometheus
+ simpleclient_pushgateway
+
+
+ org.xerial.snappy
+ snappy-java
+ 1.1.8.4
+
org.codehaus.groovy
diff --git a/kafka-ui-api/src/main/antlr4/promql/PromQLLexer.g4 b/kafka-ui-api/src/main/antlr4/promql/PromQLLexer.g4
new file mode 100644
index 00000000000..9cda649422a
--- /dev/null
+++ b/kafka-ui-api/src/main/antlr4/promql/PromQLLexer.g4
@@ -0,0 +1,176 @@
+lexer grammar PromQLLexer;
+
+channels { WHITESPACE, COMMENTS }
+
+// All keywords in PromQL are case insensitive, it is just function,
+// label and metric names that are not.
+options { caseInsensitive=true; }
+
+fragment NUMERAL: [0-9]+ ('.' [0-9]+)?;
+
+fragment SCIENTIFIC_NUMBER
+ : NUMERAL ('e' [-+]? NUMERAL)?
+ ;
+
+NUMBER
+ : NUMERAL
+ | SCIENTIFIC_NUMBER;
+
+STRING
+ : '\'' (~('\'' | '\\') | '\\' .)* '\''
+ | '"' (~('"' | '\\') | '\\' .)* '"'
+ ;
+
+// Binary operators
+
+ADD: '+';
+SUB: '-';
+MULT: '*';
+DIV: '/';
+MOD: '%';
+POW: '^';
+
+AND: 'and';
+OR: 'or';
+UNLESS: 'unless';
+
+// Comparison operators
+
+EQ: '=';
+DEQ: '==';
+NE: '!=';
+GT: '>';
+LT: '<';
+GE: '>=';
+LE: '<=';
+RE: '=~';
+NRE: '!~';
+
+// Aggregation modifiers
+
+BY: 'by';
+WITHOUT: 'without';
+
+// Join modifiers
+
+ON: 'on';
+IGNORING: 'ignoring';
+GROUP_LEFT: 'group_left';
+GROUP_RIGHT: 'group_right';
+
+OFFSET: 'offset';
+
+BOOL: 'bool';
+
+AGGREGATION_OPERATOR
+ : 'sum'
+ | 'min'
+ | 'max'
+ | 'avg'
+ | 'group'
+ | 'stddev'
+ | 'stdvar'
+ | 'count'
+ | 'count_values'
+ | 'bottomk'
+ | 'topk'
+ | 'quantile'
+ ;
+
+FUNCTION options { caseInsensitive=false; }
+ : 'abs'
+ | 'absent'
+ | 'absent_over_time'
+ | 'ceil'
+ | 'changes'
+ | 'clamp_max'
+ | 'clamp_min'
+ | 'day_of_month'
+ | 'day_of_week'
+ | 'days_in_month'
+ | 'delta'
+ | 'deriv'
+ | 'exp'
+ | 'floor'
+ | 'histogram_quantile'
+ | 'holt_winters'
+ | 'hour'
+ | 'idelta'
+ | 'increase'
+ | 'irate'
+ | 'label_join'
+ | 'label_replace'
+ | 'ln'
+ | 'log2'
+ | 'log10'
+ | 'minute'
+ | 'month'
+ | 'predict_linear'
+ | 'rate'
+ | 'resets'
+ | 'round'
+ | 'scalar'
+ | 'sort'
+ | 'sort_desc'
+ | 'sqrt'
+ | 'time'
+ | 'timestamp'
+ | 'vector'
+ | 'year'
+ | 'avg_over_time'
+ | 'min_over_time'
+ | 'max_over_time'
+ | 'sum_over_time'
+ | 'count_over_time'
+ | 'quantile_over_time'
+ | 'stddev_over_time'
+ | 'stdvar_over_time'
+ | 'last_over_time'
+ | 'acos'
+ | 'acosh'
+ | 'asin'
+ | 'asinh'
+ | 'atan'
+ | 'atanh'
+ | 'cos'
+ | 'cosh'
+ | 'sin'
+ | 'sinh'
+ | 'tan'
+ | 'tanh'
+ | 'deg'
+ | 'pi'
+ | 'rad'
+ ;
+
+LEFT_BRACE: '{';
+RIGHT_BRACE: '}';
+
+LEFT_PAREN: '(';
+RIGHT_PAREN: ')';
+
+LEFT_BRACKET: '[';
+RIGHT_BRACKET: ']';
+
+COMMA: ',';
+
+AT: '@';
+
+SUBQUERY_RANGE
+ : LEFT_BRACKET DURATION ':' DURATION? RIGHT_BRACKET;
+
+TIME_RANGE
+ : LEFT_BRACKET DURATION RIGHT_BRACKET;
+
+// The proper order (longest to the shortest) must be validated after parsing
+DURATION: ([0-9]+ ('ms' | [smhdwy]))+;
+
+METRIC_NAME: [a-z_:] [a-z0-9_:]*;
+LABEL_NAME: [a-z_] [a-z0-9_]*;
+
+
+
+WS: [\r\t\n ]+ -> channel(WHITESPACE);
+SL_COMMENT
+ : '#' .*? '\n' -> channel(COMMENTS)
+ ;
diff --git a/kafka-ui-api/src/main/antlr4/promql/PromQLParser.g4 b/kafka-ui-api/src/main/antlr4/promql/PromQLParser.g4
new file mode 100644
index 00000000000..c7703b709fc
--- /dev/null
+++ b/kafka-ui-api/src/main/antlr4/promql/PromQLParser.g4
@@ -0,0 +1,114 @@
+parser grammar PromQLParser;
+
+options { tokenVocab = PromQLLexer; }
+
+expression: vectorOperation EOF;
+
+// Binary operations are ordered by precedence
+
+// Unary operations have the same precedence as multiplications
+
+vectorOperation
+ : vectorOperation powOp vectorOperation
+ | vectorOperation subqueryOp
+ | unaryOp vectorOperation
+ | vectorOperation multOp vectorOperation
+ | vectorOperation addOp vectorOperation
+ | vectorOperation compareOp vectorOperation
+ | vectorOperation andUnlessOp vectorOperation
+ | vectorOperation orOp vectorOperation
+ | vectorOperation vectorMatchOp vectorOperation
+ | vectorOperation AT vectorOperation
+ | vector
+ ;
+
+// Operators
+
+unaryOp: (ADD | SUB);
+powOp: POW grouping?;
+multOp: (MULT | DIV | MOD) grouping?;
+addOp: (ADD | SUB) grouping?;
+compareOp: (DEQ | NE | GT | LT | GE | LE) BOOL? grouping?;
+andUnlessOp: (AND | UNLESS) grouping?;
+orOp: OR grouping?;
+vectorMatchOp: (ON | UNLESS) grouping?;
+subqueryOp: SUBQUERY_RANGE offsetOp?;
+offsetOp: OFFSET DURATION;
+
+vector
+ : function_
+ | aggregation
+ | instantSelector
+ | matrixSelector
+ | offset
+ | literal
+ | parens
+ ;
+
+parens: LEFT_PAREN vectorOperation RIGHT_PAREN;
+
+// Selectors
+
+instantSelector
+ : METRIC_NAME (LEFT_BRACE labelMatcherList? RIGHT_BRACE)?
+ | LEFT_BRACE labelMatcherList RIGHT_BRACE
+ ;
+
+labelMatcher: labelName labelMatcherOperator STRING;
+labelMatcherOperator: EQ | NE | RE | NRE;
+labelMatcherList: labelMatcher (COMMA labelMatcher)* COMMA?;
+
+matrixSelector: instantSelector TIME_RANGE;
+
+offset
+ : instantSelector OFFSET DURATION
+ | matrixSelector OFFSET DURATION
+ ;
+
+// Functions
+
+function_: FUNCTION LEFT_PAREN (parameter (COMMA parameter)*)? RIGHT_PAREN;
+
+parameter: literal | vectorOperation;
+parameterList: LEFT_PAREN (parameter (COMMA parameter)*)? RIGHT_PAREN;
+
+// Aggregations
+
+aggregation
+ : AGGREGATION_OPERATOR parameterList
+ | AGGREGATION_OPERATOR (by | without) parameterList
+ | AGGREGATION_OPERATOR parameterList ( by | without)
+ ;
+by: BY labelNameList;
+without: WITHOUT labelNameList;
+
+// Vector one-to-one/one-to-many joins
+
+grouping: (on_ | ignoring) (groupLeft | groupRight)?;
+on_: ON labelNameList;
+ignoring: IGNORING labelNameList;
+groupLeft: GROUP_LEFT labelNameList?;
+groupRight: GROUP_RIGHT labelNameList?;
+
+// Label names
+
+labelName: keyword | METRIC_NAME | LABEL_NAME;
+labelNameList: LEFT_PAREN (labelName (COMMA labelName)*)? RIGHT_PAREN;
+
+keyword
+ : AND
+ | OR
+ | UNLESS
+ | BY
+ | WITHOUT
+ | ON
+ | IGNORING
+ | GROUP_LEFT
+ | GROUP_RIGHT
+ | OFFSET
+ | BOOL
+ | AGGREGATION_OPERATOR
+ | FUNCTION
+ ;
+
+literal: NUMBER | STRING;
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
index e0b20d6c93f..ed91fc35151 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
@@ -1,6 +1,7 @@
package com.provectus.kafka.ui.config;
-import com.provectus.kafka.ui.model.MetricsConfig;
+import static com.provectus.kafka.ui.model.MetricsScrapeProperties.JMX_METRICS_TYPE;
+
import jakarta.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
@@ -42,7 +43,7 @@ public static class Cluster {
KsqldbServerAuth ksqldbServerAuth;
KeystoreConfig ksqldbServerSsl;
List kafkaConnect;
- MetricsConfigData metrics;
+ MetricsConfig metrics;
Map properties;
boolean readOnly = false;
List serde;
@@ -62,8 +63,8 @@ public static class PollingProperties {
}
@Data
- @ToString(exclude = "password")
- public static class MetricsConfigData {
+ @ToString(exclude = {"password", "keystorePassword"})
+ public static class MetricsConfig {
String type;
Integer port;
Boolean ssl;
@@ -71,6 +72,31 @@ public static class MetricsConfigData {
String password;
String keystoreLocation;
String keystorePassword;
+
+ Boolean prometheusExpose;
+ MetricsStorage store;
+ }
+
+ @Data
+ public static class MetricsStorage {
+ PrometheusStorage prometheus;
+ KafkaMetricsStorage kafka;
+ }
+
+ @Data
+ public static class KafkaMetricsStorage {
+ String topic;
+ }
+
+ @Data
+ @ToString(exclude = {"pushGatewayPassword"})
+ public static class PrometheusStorage {
+ String url;
+ String pushGatewayUrl;
+ String pushGatewayUsername;
+ String pushGatewayPassword;
+ String pushGatewayJobName;
+ Boolean remoteWrite;
}
@Data
@@ -171,7 +197,7 @@ public void validateAndSetDefaults() {
private void setMetricsDefaults() {
for (Cluster cluster : clusters) {
if (cluster.getMetrics() != null && !StringUtils.hasText(cluster.getMetrics().getType())) {
- cluster.getMetrics().setType(MetricsConfig.JMX_METRICS_TYPE);
+ cluster.getMetrics().setType(JMX_METRICS_TYPE);
}
}
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java
index 0c70b79716e..70807295cb4 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java
@@ -18,7 +18,8 @@ protected AbstractAuthSecurityConfig() {
"/login",
"/logout",
"/oauth2/**",
- "/static/**"
+ "/static/**",
+ "/metrics"
};
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/GraphsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/GraphsController.java
new file mode 100644
index 00000000000..b4bbcd92710
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/GraphsController.java
@@ -0,0 +1,79 @@
+package com.provectus.kafka.ui.controller;
+
+import com.provectus.kafka.ui.api.GraphsApi;
+import com.provectus.kafka.ui.model.GraphDataRequestDTO;
+import com.provectus.kafka.ui.model.GraphDescriptionDTO;
+import com.provectus.kafka.ui.model.GraphDescriptionsDTO;
+import com.provectus.kafka.ui.model.GraphParameterDTO;
+import com.provectus.kafka.ui.model.PrometheusApiQueryResponseDTO;
+import com.provectus.kafka.ui.model.rbac.AccessContext;
+import com.provectus.kafka.ui.service.graphs.GraphDescription;
+import com.provectus.kafka.ui.service.graphs.GraphsService;
+import java.time.Duration;
+import java.time.OffsetDateTime;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import org.mapstruct.Mapper;
+import org.mapstruct.factory.Mappers;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.server.ServerWebExchange;
+import prometheus.query.model.QueryResponse;
+import reactor.core.publisher.Mono;
+
+@RestController
+@RequiredArgsConstructor
+public class GraphsController extends AbstractController implements GraphsApi {
+
+ private static final PrometheusApiMapper MAPPER = Mappers.getMapper(PrometheusApiMapper.class);
+
+ @Mapper
+ interface PrometheusApiMapper {
+ PrometheusApiQueryResponseDTO fromClientResponse(QueryResponse resp);
+ }
+
+ private final GraphsService graphsService;
+
+ @Override
+ public Mono> getGraphData(String clusterName,
+ Mono graphDataRequestDto,
+ ServerWebExchange exchange) {
+ var context = AccessContext.builder()
+ .cluster(clusterName)
+ .operationName("getGraphData")
+ .build();
+
+ return accessControlService.validateAccess(context)
+ .then(
+ graphDataRequestDto.flatMap(req ->
+ graphsService.getGraphData(
+ getCluster(clusterName),
+ req.getId(),
+ Optional.ofNullable(req.getFrom()).map(OffsetDateTime::toInstant).orElse(null),
+ Optional.ofNullable(req.getTo()).map(OffsetDateTime::toInstant).orElse(null),
+ req.getParameters()
+ ).map(MAPPER::fromClientResponse))
+ .map(ResponseEntity::ok)
+ ).doOnEach(sig -> auditService.audit(context, sig));
+ }
+
+ @Override
+ public Mono> getGraphsList(String clusterName,
+ ServerWebExchange exchange) {
+ var context = AccessContext.builder()
+ .cluster(clusterName)
+ .operationName("getGraphsList")
+ .build();
+
+ var graphs = graphsService.getGraphs(getCluster(clusterName));
+ return accessControlService.validateAccess(context).then(
+ Mono.just(ResponseEntity.ok(new GraphDescriptionsDTO().graphs(graphs.map(this::map).toList()))));
+ }
+
+ private GraphDescriptionDTO map(GraphDescription graph) {
+ return new GraphDescriptionDTO(graph.id())
+ .defaultPeriod(Optional.ofNullable(graph.defaultInterval()).map(Duration::toString).orElse(null))
+ .type(graph.isRange() ? GraphDescriptionDTO.TypeEnum.RANGE : GraphDescriptionDTO.TypeEnum.INSTANT)
+ .parameters(graph.params().stream().map(GraphParameterDTO::new).toList());
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/PrometheusExposeController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/PrometheusExposeController.java
new file mode 100644
index 00000000000..1e964909a3f
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/PrometheusExposeController.java
@@ -0,0 +1,32 @@
+package com.provectus.kafka.ui.controller;
+
+import com.provectus.kafka.ui.api.PrometheusExposeApi;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.service.StatisticsCache;
+import com.provectus.kafka.ui.service.metrics.prometheus.PrometheusExpose;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Mono;
+
+@RestController
+@RequiredArgsConstructor
+public class PrometheusExposeController extends AbstractController implements PrometheusExposeApi {
+
+ private final StatisticsCache statisticsCache;
+
+ @Override
+ public Mono> getAllMetrics(ServerWebExchange exchange) {
+ return Mono.just(
+ PrometheusExpose.exposeAllMetrics(
+ clustersStorage.getKafkaClusters()
+ .stream()
+ .filter(KafkaCluster::isExposeMetricsViaPrometheusEndpoint)
+ .collect(Collectors.toMap(KafkaCluster::getName, c -> statisticsCache.get(c).getMetrics()))
+ )
+ );
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
index a122a269a4e..7b337f20da7 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
@@ -1,9 +1,12 @@
package com.provectus.kafka.ui.mapper;
+import static io.prometheus.client.Collector.MetricFamilySamples;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.model.BrokerConfigDTO;
import com.provectus.kafka.ui.model.BrokerDTO;
-import com.provectus.kafka.ui.model.BrokerDiskUsageDTO;
import com.provectus.kafka.ui.model.BrokerMetricsDTO;
import com.provectus.kafka.ui.model.ClusterDTO;
import com.provectus.kafka.ui.model.ClusterFeature;
@@ -14,7 +17,6 @@
import com.provectus.kafka.ui.model.ConnectDTO;
import com.provectus.kafka.ui.model.InternalBroker;
import com.provectus.kafka.ui.model.InternalBrokerConfig;
-import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
import com.provectus.kafka.ui.model.InternalClusterState;
import com.provectus.kafka.ui.model.InternalPartition;
import com.provectus.kafka.ui.model.InternalReplica;
@@ -30,10 +32,13 @@
import com.provectus.kafka.ui.model.TopicConfigDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.model.TopicDetailsDTO;
-import com.provectus.kafka.ui.service.metrics.RawMetric;
+import com.provectus.kafka.ui.service.metrics.SummarizedMetrics;
+import java.math.BigDecimal;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
@@ -52,21 +57,28 @@ public interface ClusterMapper {
ClusterStatsDTO toClusterStats(InternalClusterState clusterState);
+ @Deprecated
default ClusterMetricsDTO toClusterMetrics(Metrics metrics) {
return new ClusterMetricsDTO()
- .items(metrics.getSummarizedMetrics().map(this::convert).collect(Collectors.toList()));
+ .items(convert(new SummarizedMetrics(metrics).asStream()).toList());
}
- private MetricDTO convert(RawMetric rawMetric) {
- return new MetricDTO()
- .name(rawMetric.name())
- .labels(rawMetric.labels())
- .value(rawMetric.value());
+ private Stream convert(Stream metrics) {
+ return metrics
+ .flatMap(m -> m.samples.stream())
+ .map(s ->
+ new MetricDTO()
+ .name(s.name)
+ .labels(IntStream.range(0, s.labelNames.size())
+ .boxed()
+ //collecting to map, keeping order
+ .collect(toMap(s.labelNames::get, s.labelValues::get, (m1, m2) -> null, LinkedHashMap::new)))
+ .value(BigDecimal.valueOf(s.value))
+ );
}
- default BrokerMetricsDTO toBrokerMetrics(List metrics) {
- return new BrokerMetricsDTO()
- .metrics(metrics.stream().map(this::convert).collect(Collectors.toList()));
+ default BrokerMetricsDTO toBrokerMetrics(List metrics) {
+ return new BrokerMetricsDTO().metrics(convert(metrics.stream()).toList());
}
@Mapping(target = "isSensitive", source = "sensitive")
@@ -107,15 +119,7 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) {
List toFeaturesEnum(List features);
default List map(Map map) {
- return map.values().stream().map(this::toPartition).collect(Collectors.toList());
- }
-
- default BrokerDiskUsageDTO map(Integer id, InternalBrokerDiskUsage internalBrokerDiskUsage) {
- final BrokerDiskUsageDTO brokerDiskUsage = new BrokerDiskUsageDTO();
- brokerDiskUsage.setBrokerId(id);
- brokerDiskUsage.segmentCount((int) internalBrokerDiskUsage.getSegmentCount());
- brokerDiskUsage.segmentSize(internalBrokerDiskUsage.getSegmentSize());
- return brokerDiskUsage;
+ return map.values().stream().map(this::toPartition).collect(toList());
}
static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) {
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java
index 3d84aa3ad90..bb37768dcc0 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/DescribeLogDirsMapper.java
@@ -7,6 +7,8 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.clients.admin.ReplicaInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
@@ -16,7 +18,7 @@
public class DescribeLogDirsMapper {
public List toBrokerLogDirsList(
- Map> logDirsInfo) {
+ Map> logDirsInfo) {
return logDirsInfo.entrySet().stream().map(
mapEntry -> mapEntry.getValue().entrySet().stream()
@@ -26,13 +28,13 @@ public List toBrokerLogDirsList(
}
private BrokersLogdirsDTO toBrokerLogDirs(Integer broker, String dirName,
- DescribeLogDirsResponse.LogDirInfo logDirInfo) {
+ LogDirDescription logDirInfo) {
BrokersLogdirsDTO result = new BrokersLogdirsDTO();
result.setName(dirName);
- if (logDirInfo.error != null && logDirInfo.error != Errors.NONE) {
- result.setError(logDirInfo.error.message());
+ if (logDirInfo.error() != null) {
+ result.setError(logDirInfo.error().getMessage());
}
- var topics = logDirInfo.replicaInfos.entrySet().stream()
+ var topics = logDirInfo.replicaInfos().entrySet().stream()
.collect(Collectors.groupingBy(e -> e.getKey().topic())).entrySet().stream()
.map(e -> toTopicLogDirs(broker, e.getKey(), e.getValue()))
.collect(Collectors.toList());
@@ -41,8 +43,7 @@ private BrokersLogdirsDTO toBrokerLogDirs(Integer broker, String dirName,
}
private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name,
- List> partitions) {
+ List> partitions) {
BrokerTopicLogdirsDTO topic = new BrokerTopicLogdirsDTO();
topic.setName(name);
topic.setPartitions(
@@ -54,13 +55,12 @@ private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name,
}
private BrokerTopicPartitionLogdirDTO topicPartitionLogDir(Integer broker, Integer partition,
- DescribeLogDirsResponse.ReplicaInfo
- replicaInfo) {
+ ReplicaInfo replicaInfo) {
BrokerTopicPartitionLogdirDTO logDir = new BrokerTopicPartitionLogdirDTO();
logDir.setBroker(broker);
logDir.setPartition(partition);
- logDir.setSize(replicaInfo.size);
- logDir.setOffsetLag(replicaInfo.offsetLag);
+ logDir.setSize(replicaInfo.size());
+ logDir.setOffsetLag(replicaInfo.offsetLag());
return logDir;
}
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBroker.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBroker.java
index 4a0d1ba0dd1..1a2c96d8c1c 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBroker.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBroker.java
@@ -21,12 +21,12 @@ public class InternalBroker {
public InternalBroker(Node node,
PartitionDistributionStats partitionDistribution,
- Statistics statistics) {
+ Metrics metrics) {
this.id = node.id();
this.host = node.host();
this.port = node.port();
- this.bytesInPerSec = statistics.getMetrics().getBrokerBytesInPerSec().get(node.id());
- this.bytesOutPerSec = statistics.getMetrics().getBrokerBytesOutPerSec().get(node.id());
+ this.bytesInPerSec = metrics.getIoRates().brokerBytesInPerSec().get(node.id());
+ this.bytesOutPerSec = metrics.getIoRates().brokerBytesOutPerSec().get(node.id());
this.partitionsLeader = partitionDistribution.getPartitionLeaders().get(node);
this.partitions = partitionDistribution.getPartitionsCount().get(node);
this.inSyncPartitions = partitionDistribution.getInSyncPartitions().get(node);
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerDiskUsage.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerDiskUsage.java
deleted file mode 100644
index 104051dc9eb..00000000000
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerDiskUsage.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package com.provectus.kafka.ui.model;
-
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder(toBuilder = true)
-public class InternalBrokerDiskUsage {
- private final long segmentCount;
- private final long segmentSize;
-}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java
deleted file mode 100644
index 17aa8e51312..00000000000
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterMetrics.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package com.provectus.kafka.ui.model;
-
-import java.math.BigDecimal;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-import lombok.Builder;
-import lombok.Data;
-
-
-@Data
-@Builder(toBuilder = true)
-public class InternalClusterMetrics {
-
- public static InternalClusterMetrics empty() {
- return InternalClusterMetrics.builder()
- .brokers(List.of())
- .topics(Map.of())
- .status(ServerStatusDTO.OFFLINE)
- .internalBrokerMetrics(Map.of())
- .metrics(List.of())
- .version("unknown")
- .build();
- }
-
- private final String version;
-
- private final ServerStatusDTO status;
- private final Throwable lastKafkaException;
-
- private final int brokerCount;
- private final int activeControllers;
- private final List brokers;
-
- private final int topicCount;
- private final Map topics;
-
- // partitions stats
- private final int underReplicatedPartitionCount;
- private final int onlinePartitionCount;
- private final int offlinePartitionCount;
- private final int inSyncReplicasCount;
- private final int outOfSyncReplicasCount;
-
- // log dir stats
- @Nullable // will be null if log dir collection disabled
- private final Map internalBrokerDiskUsage;
-
- // metrics from metrics collector
- private final BigDecimal bytesInPerSec;
- private final BigDecimal bytesOutPerSec;
- private final Map internalBrokerMetrics;
- private final List metrics;
-
-}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java
index 28e9a7413a3..201faa1f89a 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java
@@ -36,39 +36,42 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
.message(e.getMessage())
.stackTrace(Throwables.getStackTraceAsString(e)))
.orElse(null);
- topicCount = statistics.getTopicDescriptions().size();
+ topicCount = (int) statistics.topicDescriptions().count();
brokerCount = statistics.getClusterDescription().getNodes().size();
activeControllers = Optional.ofNullable(statistics.getClusterDescription().getController())
.map(Node::id)
.orElse(null);
version = statistics.getVersion();
- if (statistics.getLogDirInfo() != null) {
- diskUsage = statistics.getLogDirInfo().getBrokerStats().entrySet().stream()
- .map(e -> new BrokerDiskUsageDTO()
- .brokerId(e.getKey())
- .segmentSize(e.getValue().getSegmentSize())
- .segmentCount(e.getValue().getSegmentsCount()))
- .collect(Collectors.toList());
- }
+ diskUsage = statistics.getClusterState().getNodesStates().values().stream()
+ .filter(n -> n.segmentStats() != null)
+ .map(n -> new BrokerDiskUsageDTO()
+ .brokerId(n.id())
+ .segmentSize(n.segmentStats().getSegmentSize())
+ .segmentCount(n.segmentStats().getSegmentsCount()))
+ .collect(Collectors.toList());
features = statistics.getFeatures();
bytesInPerSec = statistics
.getMetrics()
- .getBrokerBytesInPerSec()
- .values().stream()
+ .getIoRates()
+ .brokerBytesInPerSec()
+ .values()
+ .stream()
.reduce(BigDecimal::add)
.orElse(null);
bytesOutPerSec = statistics
.getMetrics()
- .getBrokerBytesOutPerSec()
- .values().stream()
+ .getIoRates()
+ .brokerBytesOutPerSec()
+ .values()
+ .stream()
.reduce(BigDecimal::add)
.orElse(null);
- var partitionsStats = new PartitionsStats(statistics.getTopicDescriptions().values());
+ var partitionsStats = new PartitionsStats(statistics.topicDescriptions().toList());
onlinePartitionCount = partitionsStats.getOnlinePartitionCount();
offlinePartitionCount = partitionsStats.getOfflinePartitionCount();
inSyncReplicasCount = partitionsStats.getInSyncReplicasCount();
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java
index 34ec3d59e3e..fa9b0f16169 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java
@@ -3,14 +3,17 @@
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.summarizingLong;
-import static java.util.stream.Collectors.toList;
+import jakarta.annotation.Nullable;
+import java.util.HashMap;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.RequiredArgsConstructor;
import lombok.Value;
+import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
@@ -19,32 +22,39 @@
public class InternalLogDirStats {
@Value
+ @RequiredArgsConstructor
public static class SegmentStats {
- long segmentSize;
- int segmentsCount;
+ Long segmentSize;
+ Integer segmentsCount;
- public SegmentStats(LongSummaryStatistics s) {
- segmentSize = s.getSum();
- segmentsCount = (int) s.getCount();
+ private SegmentStats(LongSummaryStatistics s) {
+ this(s.getSum(), (int) s.getCount());
}
}
+ public record LogDirSpaceStats(@Nullable Long totalBytes,
+ @Nullable Long usableBytes,
+ Map totalPerDir,
+ Map usablePerDir) {
+ }
+
Map partitionsStats;
Map topicStats;
Map brokerStats;
+ Map brokerDirsStats;
public static InternalLogDirStats empty() {
return new InternalLogDirStats(Map.of());
}
- public InternalLogDirStats(Map> log) {
+ public InternalLogDirStats(Map> logsInfo) {
final List> topicPartitions =
- log.entrySet().stream().flatMap(b ->
+ logsInfo.entrySet().stream().flatMap(b ->
b.getValue().entrySet().stream().flatMap(topicMap ->
- topicMap.getValue().replicaInfos.entrySet().stream()
- .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
+ topicMap.getValue().replicaInfos().entrySet().stream()
+ .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size()))
)
- ).collect(toList());
+ ).toList();
partitionsStats = topicPartitions.stream().collect(
groupingBy(
@@ -64,5 +74,34 @@ public InternalLogDirStats(Map calculateSpaceStats(
+ Map> logsInfo) {
+
+ var stats = new HashMap();
+ logsInfo.forEach((brokerId, logDirStats) -> {
+ Map totalBytes = new HashMap<>();
+ Map usableBytes = new HashMap<>();
+ logDirStats.forEach((logDir, descr) -> {
+ if (descr.error() == null) {
+ return;
+ }
+ descr.totalBytes().ifPresent(b -> totalBytes.merge(logDir, b, Long::sum));
+ descr.usableBytes().ifPresent(b -> usableBytes.merge(logDir, b, Long::sum));
+ });
+ stats.put(
+ brokerId,
+ new LogDirSpaceStats(
+ totalBytes.isEmpty() ? null : totalBytes.values().stream().mapToLong(i -> i).sum(),
+ usableBytes.isEmpty() ? null : usableBytes.values().stream().mapToLong(i -> i).sum(),
+ totalBytes,
+ usableBytes
+ )
+ );
+ });
+ return stats;
}
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartitionsOffsets.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartitionsOffsets.java
index 9fb54a300ed..742320e53bc 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartitionsOffsets.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartitionsOffsets.java
@@ -4,6 +4,7 @@
import com.google.common.collect.Table;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import lombok.Value;
import org.apache.kafka.common.TopicPartition;
@@ -30,4 +31,11 @@ public Optional get(String topic, int partition) {
return Optional.ofNullable(offsets.get(topic, partition));
}
+ public Map topicOffsets(String topic, boolean earliest) {
+ return offsets.row(topic)
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> earliest ? e.getValue().earliest : e.getValue().getLatest()));
+ }
+
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSegmentSizeDto.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSegmentSizeDto.java
deleted file mode 100644
index a34db011321..00000000000
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSegmentSizeDto.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.provectus.kafka.ui.model;
-
-import java.util.Map;
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder(toBuilder = true)
-public class InternalSegmentSizeDto {
-
- private final Map internalTopicWithSegmentSize;
- private final InternalClusterMetrics clusterMetricsWithSegmentSize;
-}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java
index 43a6012d215..5a3aa149d65 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java
@@ -1,23 +1,22 @@
package com.provectus.kafka.ui.model;
-import com.provectus.kafka.ui.config.ClustersProperties;
+import static com.provectus.kafka.ui.model.InternalLogDirStats.SegmentStats;
+
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Builder;
import lombok.Data;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.common.TopicPartition;
@Data
@Builder(toBuilder = true)
public class InternalTopic {
- ClustersProperties clustersProperties;
-
// from TopicDescription
private final String name;
private final boolean internal;
@@ -44,7 +43,8 @@ public static InternalTopic from(TopicDescription topicDescription,
List configs,
InternalPartitionsOffsets partitionsOffsets,
Metrics metrics,
- InternalLogDirStats logDirInfo,
+ @Nullable SegmentStats segmentStats,
+ @Nullable Map partitionsSegmentStats,
@Nullable String internalTopicPrefix) {
var topic = InternalTopic.builder();
@@ -81,13 +81,12 @@ public static InternalTopic from(TopicDescription topicDescription,
partitionDto.offsetMax(offsets.getLatest());
});
- var segmentStats =
- logDirInfo.getPartitionsStats().get(
- new TopicPartition(topicDescription.name(), partition.partition()));
- if (segmentStats != null) {
- partitionDto.segmentCount(segmentStats.getSegmentsCount());
- partitionDto.segmentSize(segmentStats.getSegmentSize());
- }
+ Optional.ofNullable(partitionsSegmentStats)
+ .flatMap(s -> Optional.ofNullable(s.get(partition.partition())))
+ .ifPresent(stats -> {
+ partitionDto.segmentCount(stats.getSegmentsCount());
+ partitionDto.segmentSize(stats.getSegmentSize());
+ });
return partitionDto.build();
})
@@ -108,14 +107,14 @@ public static InternalTopic from(TopicDescription topicDescription,
: topicDescription.partitions().get(0).replicas().size()
);
- var segmentStats = logDirInfo.getTopicStats().get(topicDescription.name());
- if (segmentStats != null) {
- topic.segmentCount(segmentStats.getSegmentsCount());
- topic.segmentSize(segmentStats.getSegmentSize());
- }
+ Optional.ofNullable(segmentStats)
+ .ifPresent(stats -> {
+ topic.segmentCount(stats.getSegmentsCount());
+ topic.segmentSize(stats.getSegmentSize());
+ });
- topic.bytesInPerSec(metrics.getTopicBytesInPerSec().get(topicDescription.name()));
- topic.bytesOutPerSec(metrics.getTopicBytesOutPerSec().get(topicDescription.name()));
+ topic.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(topicDescription.name()));
+ topic.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(topicDescription.name()));
topic.topicConfigs(
configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
index 1e2903dbcc9..fc797e3811a 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
@@ -5,6 +5,7 @@
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
import com.provectus.kafka.ui.service.masking.DataMasking;
+import com.provectus.kafka.ui.service.metrics.scrape.MetricsScrapping;
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
import com.provectus.kafka.ui.util.ReactiveFailover;
import java.util.Map;
@@ -13,6 +14,7 @@
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
+import prometheus.query.api.PrometheusClientApi;
@Data
@Builder(toBuilder = true)
@@ -25,10 +27,12 @@ public class KafkaCluster {
private final String bootstrapServers;
private final Properties properties;
private final boolean readOnly;
- private final MetricsConfig metricsConfig;
+ private final boolean exposeMetricsViaPrometheusEndpoint;
private final DataMasking masking;
private final PollingSettings pollingSettings;
private final ReactiveFailover schemaRegistryClient;
private final Map> connectsClients;
private final ReactiveFailover ksqlClient;
+ private final MetricsScrapping metricsScrapping;
+ private final ReactiveFailover prometheusStorageClient;
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java
index 02bfe6dea13..c0f9737da0f 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java
@@ -1,13 +1,11 @@
package com.provectus.kafka.ui.model;
-import static java.util.stream.Collectors.toMap;
+import static io.prometheus.client.Collector.MetricFamilySamples;
-import com.provectus.kafka.ui.service.metrics.RawMetric;
+import com.provectus.kafka.ui.service.metrics.scrape.inferred.InferredMetrics;
import java.math.BigDecimal;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.stream.Stream;
import lombok.Builder;
import lombok.Value;
@@ -16,28 +14,32 @@
@Value
public class Metrics {
- Map brokerBytesInPerSec;
- Map brokerBytesOutPerSec;
- Map topicBytesInPerSec;
- Map topicBytesOutPerSec;
- Map> perBrokerMetrics;
+ IoRates ioRates;
+ InferredMetrics inferredMetrics;
+ Map> perBrokerScrapedMetrics;
public static Metrics empty() {
return Metrics.builder()
- .brokerBytesInPerSec(Map.of())
- .brokerBytesOutPerSec(Map.of())
- .topicBytesInPerSec(Map.of())
- .topicBytesOutPerSec(Map.of())
- .perBrokerMetrics(Map.of())
+ .ioRates(IoRates.empty())
+ .perBrokerScrapedMetrics(Map.of())
+ .inferredMetrics(InferredMetrics.empty())
.build();
}
- public Stream getSummarizedMetrics() {
- return perBrokerMetrics.values().stream()
- .flatMap(Collection::stream)
- .collect(toMap(RawMetric::identityKey, m -> m, (m1, m2) -> m1.copyWithValue(m1.value().add(m2.value()))))
- .values()
- .stream();
+ @Builder
+ public record IoRates(Map brokerBytesInPerSec,
+ Map brokerBytesOutPerSec,
+ Map topicBytesInPerSec,
+ Map topicBytesOutPerSec) {
+
+ static IoRates empty() {
+ return IoRates.builder()
+ .brokerBytesOutPerSec(Map.of())
+ .brokerBytesInPerSec(Map.of())
+ .topicBytesOutPerSec(Map.of())
+ .topicBytesInPerSec(Map.of())
+ .build();
+ }
}
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsConfig.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsConfig.java
deleted file mode 100644
index d3551443437..00000000000
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsConfig.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package com.provectus.kafka.ui.model;
-
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder(toBuilder = true)
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
-public class MetricsConfig {
- public static final String JMX_METRICS_TYPE = "JMX";
- public static final String PROMETHEUS_METRICS_TYPE = "PROMETHEUS";
-
- private final String type;
- private final Integer port;
- private final boolean ssl;
- private final String username;
- private final String password;
- private final String keystoreLocation;
- private final String keystorePassword;
-}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsScrapeProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsScrapeProperties.java
new file mode 100644
index 00000000000..2e64b0253db
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsScrapeProperties.java
@@ -0,0 +1,46 @@
+package com.provectus.kafka.ui.model;
+
+import static com.provectus.kafka.ui.config.ClustersProperties.KeystoreConfig;
+import static com.provectus.kafka.ui.config.ClustersProperties.TruststoreConfig;
+
+import com.provectus.kafka.ui.config.ClustersProperties;
+import jakarta.annotation.Nullable;
+import java.util.Objects;
+import java.util.Optional;
+import lombok.Builder;
+import lombok.Value;
+
+@Value
+@Builder
+public class MetricsScrapeProperties {
+ public static final String JMX_METRICS_TYPE = "JMX";
+ public static final String PROMETHEUS_METRICS_TYPE = "PROMETHEUS";
+
+ Integer port;
+ boolean ssl;
+ String username;
+ String password;
+
+ @Nullable
+ KeystoreConfig keystoreConfig;
+
+ @Nullable
+ TruststoreConfig truststoreConfig;
+
+ public static MetricsScrapeProperties create(ClustersProperties.Cluster cluster) {
+ var metrics = Objects.requireNonNull(cluster.getMetrics());
+ return MetricsScrapeProperties.builder()
+ .port(metrics.getPort())
+ .ssl(Optional.ofNullable(metrics.getSsl()).orElse(false))
+ .username(metrics.getUsername())
+ .password(metrics.getPassword())
+ .truststoreConfig(cluster.getSsl())
+ .keystoreConfig(
+ metrics.getKeystoreLocation() != null
+ ? new KeystoreConfig(metrics.getKeystoreLocation(), metrics.getKeystorePassword())
+ : null
+ )
+ .build();
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionDistributionStats.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionDistributionStats.java
index 46efc670008..3ba498618be 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionDistributionStats.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionDistributionStats.java
@@ -1,14 +1,17 @@
package com.provectus.kafka.ui.model;
+import com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
@@ -29,15 +32,19 @@ public class PartitionDistributionStats {
private final boolean skewCanBeCalculated;
public static PartitionDistributionStats create(Statistics stats) {
- return create(stats, MIN_PARTITIONS_FOR_SKEW_CALCULATION);
+ return create(
+ stats.topicDescriptions().toList(),
+ MIN_PARTITIONS_FOR_SKEW_CALCULATION
+ );
}
- static PartitionDistributionStats create(Statistics stats, int minPartitionsForSkewCalculation) {
+ static PartitionDistributionStats create(List topicDescriptions,
+ int minPartitionsForSkewCalculation) {
var partitionLeaders = new HashMap();
var partitionsReplicated = new HashMap();
var isr = new HashMap();
int partitionsCnt = 0;
- for (TopicDescription td : stats.getTopicDescriptions().values()) {
+ for (TopicDescription td : topicDescriptions) {
for (TopicPartitionInfo tp : td.partitions()) {
partitionsCnt++;
tp.replicas().forEach(r -> incr(partitionsReplicated, r));
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java
index e70547f1437..998ab2891e8 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java
@@ -1,9 +1,11 @@
package com.provectus.kafka.ui.model;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Stream;
import lombok.Builder;
import lombok.Value;
import org.apache.kafka.clients.admin.ConfigEntry;
@@ -18,9 +20,7 @@ public class Statistics {
List features;
ReactiveAdminClient.ClusterDescription clusterDescription;
Metrics metrics;
- InternalLogDirStats logDirInfo;
- Map topicDescriptions;
- Map> topicConfigs;
+ ScrapedClusterState clusterState;
public static Statistics empty() {
return builder()
@@ -30,9 +30,12 @@ public static Statistics empty() {
.clusterDescription(
new ReactiveAdminClient.ClusterDescription(null, null, List.of(), Set.of()))
.metrics(Metrics.empty())
- .logDirInfo(InternalLogDirStats.empty())
- .topicDescriptions(Map.of())
- .topicConfigs(Map.of())
+ .clusterState(ScrapedClusterState.empty())
.build();
}
+
+ public Stream topicDescriptions() {
+ return clusterState.getTopicStates().values().stream().map(ScrapedClusterState.TopicState::description);
+ }
+
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java
index ff86d2be0f7..b678b7e95fa 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java
@@ -1,5 +1,7 @@
package com.provectus.kafka.ui.service;
+import static io.prometheus.client.Collector.MetricFamilySamples;
+
import com.provectus.kafka.ui.exception.InvalidRequestApiException;
import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
import com.provectus.kafka.ui.exception.NotFoundException;
@@ -11,7 +13,6 @@
import com.provectus.kafka.ui.model.InternalBrokerConfig;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.PartitionDistributionStats;
-import com.provectus.kafka.ui.service.metrics.RawMetric;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -21,13 +22,13 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.LogDirNotFoundException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -72,7 +73,7 @@ public Flux getBrokers(KafkaCluster cluster) {
.get(cluster)
.flatMap(ReactiveAdminClient::describeCluster)
.map(description -> description.getNodes().stream()
- .map(node -> new InternalBroker(node, partitionsDistribution, stats))
+ .map(node -> new InternalBroker(node, partitionsDistribution, stats.getMetrics()))
.collect(Collectors.toList()))
.flatMapMany(Flux::fromIterable);
}
@@ -110,7 +111,7 @@ public Mono updateBrokerConfigByName(KafkaCluster cluster,
.doOnError(e -> log.error("Unexpected error", e));
}
- private Mono