Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BE: Chore: use record classes #703

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package io.kafbat.ui.model;

import com.google.common.base.Throwables;
import io.kafbat.ui.model.BrokerDiskUsageDTO;
import io.kafbat.ui.model.MetricsCollectionErrorDTO;
import io.kafbat.ui.model.ServerStatusDTO;
import java.math.BigDecimal;
import java.util.List;
import java.util.Optional;
@@ -40,8 +37,8 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
.stackTrace(Throwables.getStackTraceAsString(e)))
.orElse(null);
topicCount = statistics.getTopicDescriptions().size();
brokerCount = statistics.getClusterDescription().getNodes().size();
activeControllers = Optional.ofNullable(statistics.getClusterDescription().getController())
brokerCount = statistics.getClusterDescription().nodes().size();
activeControllers = Optional.ofNullable(statistics.getClusterDescription().controller())
.map(Node::id)
.orElse(null);
version = statistics.getVersion();
Original file line number Diff line number Diff line change
@@ -4,16 +4,11 @@
import com.google.common.collect.Table;
import java.util.Map;
import java.util.Optional;
import lombok.Value;
import org.apache.kafka.common.TopicPartition;


public class InternalPartitionsOffsets {

@Value
public static class Offsets {
Long earliest;
Long latest;
public record Offsets(Long earliest, Long latest) {
}

private final Table<String, Integer, Offsets> offsets = HashBasedTable.create();
9 changes: 1 addition & 8 deletions api/src/main/java/io/kafbat/ui/model/InternalReplica.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
package io.kafbat.ui.model;

import lombok.Builder;
import lombok.Data;
import lombok.RequiredArgsConstructor;

@Data
@Builder
@RequiredArgsConstructor
public class InternalReplica {
private final int broker;
private final boolean leader;
private final boolean inSync;
public record InternalReplica(int broker, boolean leader, boolean inSync) {
}
4 changes: 2 additions & 2 deletions api/src/main/java/io/kafbat/ui/model/InternalTopic.java
Original file line number Diff line number Diff line change
@@ -77,8 +77,8 @@ public static InternalTopic from(TopicDescription topicDescription,

partitionsOffsets.get(topicDescription.name(), partition.partition())
.ifPresent(offsets -> {
partitionDto.offsetMin(offsets.getEarliest());
partitionDto.offsetMax(offsets.getLatest());
partitionDto.offsetMin(offsets.earliest());
partitionDto.offsetMax(offsets.latest());
});

var segmentStats =
Original file line number Diff line number Diff line change
@@ -79,14 +79,14 @@ private void fillKey(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec)
}
try {
var deserResult = keyDeserializer.deserialize(new RecordHeadersImpl(), rec.key().get());
message.setKey(deserResult.getResult());
message.setKey(deserResult.result());
message.setKeySerde(keySerdeName);
message.setKeyDeserializeProperties(deserResult.getAdditionalProperties());
message.setKeyDeserializeProperties(deserResult.additionalProperties());
} catch (Exception e) {
log.trace("Error deserializing key for key topic: {}, partition {}, offset {}, with serde {}",
rec.topic(), rec.partition(), rec.offset(), keySerdeName, e);
var deserResult = fallbackKeyDeserializer.deserialize(new RecordHeadersImpl(), rec.key().get());
message.setKey(deserResult.getResult());
message.setKey(deserResult.result());
message.setKeySerde(fallbackSerdeName);
}
}
@@ -98,15 +98,15 @@ private void fillValue(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec
try {
var deserResult = valueDeserializer.deserialize(
new RecordHeadersImpl(rec.headers()), rec.value().get());
message.setContent(deserResult.getResult());
message.setContent(deserResult.result());
message.setValueSerde(valueSerdeName);
message.setValueDeserializeProperties(deserResult.getAdditionalProperties());
message.setValueDeserializeProperties(deserResult.additionalProperties());
} catch (Exception e) {
log.trace("Error deserializing key for value topic: {}, partition {}, offset {}, with serde {}",
rec.topic(), rec.partition(), rec.offset(), valueSerdeName, e);
var deserResult = fallbackValueDeserializer.deserialize(
new RecordHeadersImpl(rec.headers()), rec.value().get());
message.setContent(deserResult.getResult());
message.setContent(deserResult.result());
message.setValueSerde(fallbackSerdeName);
}
}
Original file line number Diff line number Diff line change
@@ -18,15 +18,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import lombok.Value;


class CustomSerdeLoader {

@Value
static class CustomSerde {
Serde serde;
ClassLoader classLoader;
record CustomSerde(Serde serde, ClassLoader classLoader) {
}

// serde location -> classloader
Original file line number Diff line number Diff line change
@@ -266,10 +266,10 @@ private SerdeInstance loadAndInitCustomSerde(ClustersProperties.SerdeConfig serd
serdeConfig.getClassName(), serdeConfig.getFilePath(), serdeProps, clusterProps, globalProps);
return new SerdeInstance(
serdeConfig.getName(),
loaded.getSerde(),
loaded.serde(),
nullablePattern(serdeConfig.getTopicKeysPattern()),
nullablePattern(serdeConfig.getTopicValuesPattern()),
loaded.getClassLoader()
loaded.classLoader()
);
}

6 changes: 3 additions & 3 deletions api/src/main/java/io/kafbat/ui/service/BrokerService.java
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ private Mono<List<ConfigEntry>> loadBrokersConfig(
}

private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Integer brokerId) {
if (statisticsCache.get(cluster).getClusterDescription().getNodes()
if (statisticsCache.get(cluster).getClusterDescription().nodes()
.stream().noneMatch(node -> node.id() == brokerId)) {
return Flux.error(
new NotFoundException(String.format("Broker with id %s not found", brokerId)));
@@ -70,7 +70,7 @@ public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
return adminClientService
.get(cluster)
.flatMap(ReactiveAdminClient::describeCluster)
.map(description -> description.getNodes().stream()
.map(description -> description.nodes().stream()
.map(node -> new InternalBroker(node, partitionsDistribution, stats))
.collect(Collectors.toList()))
.flatMapMany(Flux::fromIterable);
@@ -113,7 +113,7 @@ private Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> getC
KafkaCluster cluster, List<Integer> reqBrokers) {
return adminClientService.get(cluster)
.flatMap(admin -> {
List<Integer> brokers = statisticsCache.get(cluster).getClusterDescription().getNodes()
List<Integer> brokers = statisticsCache.get(cluster).getClusterDescription().nodes()
.stream()
.map(Node::id)
.collect(Collectors.toList());
Original file line number Diff line number Diff line change
@@ -143,8 +143,8 @@ private SerdeDescriptionDTO toDto(SerdeInstance serdeInstance,
return new SerdeDescriptionDTO()
.name(serdeInstance.getName())
.description(serdeInstance.description().orElse(null))
.schema(schemaOpt.map(SchemaDescription::getSchema).orElse(null))
.additionalProperties(schemaOpt.map(SchemaDescription::getAdditionalProperties).orElse(null))
.schema(schemaOpt.map(SchemaDescription::schema).orElse(null))
.additionalProperties(schemaOpt.map(SchemaDescription::additionalProperties).orElse(null))
.preferred(preferred);
}

Original file line number Diff line number Diff line change
@@ -61,7 +61,7 @@ private Mono<ClusterFeature> quotaManagement(ReactiveAdminClient adminClient) {
}

private Mono<ClusterFeature> aclEdit(ReactiveAdminClient adminClient, ClusterDescription clusterDescription) {
var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of());
var authorizedOps = Optional.ofNullable(clusterDescription.authorizedOperations()).orElse(Set.of());
boolean canEdit = aclViewEnabled(adminClient)
&& (authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER));
return canEdit
15 changes: 5 additions & 10 deletions api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java
Original file line number Diff line number Diff line change
@@ -124,14 +124,9 @@ static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, String kafkaVersio
}
}

@Value
public static class ClusterDescription {
@Nullable
Node controller;
String clusterId;
Collection<Node> nodes;
@Nullable // null, if ACL is disabled
Set<AclOperation> authorizedOperations;
public record ClusterDescription(@Nullable Node controller, String clusterId, Collection<Node> nodes,
// null, if ACL is disabled
@Nullable Set<AclOperation> authorizedOperations) {
}

@Builder
@@ -147,7 +142,7 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
// choosing node from which we will get configs (starting with controller)
var targetNodeId = Optional.ofNullable(desc.controller)
.map(Node::id)
.orElse(desc.getNodes().iterator().next().id());
.orElse(desc.nodes().iterator().next().id());
return loadBrokersConfig(ac, List.of(targetNodeId))
.map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(targetNodeId))
.flatMap(configs -> {
@@ -391,7 +386,7 @@ static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> v

public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
return describeCluster()
.map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
.map(d -> d.nodes().stream().map(Node::id).collect(toList()))
.flatMap(this::describeLogDirs);
}

Original file line number Diff line number Diff line change
@@ -37,10 +37,10 @@ public Mono<Statistics> updateCache(KafkaCluster c) {
private Mono<Statistics> getStatistics(KafkaCluster cluster) {
return adminClientService.get(cluster).flatMap(ac ->
ac.describeCluster().flatMap(description ->
ac.updateInternalStats(description.getController()).then(
ac.updateInternalStats(description.controller()).then(
Mono.zip(
List.of(
metricsCollector.getBrokerMetrics(cluster, description.getNodes()),
metricsCollector.getBrokerMetrics(cluster, description.nodes()),
getLogDirInfo(description, ac),
featureService.getAvailableFeatures(ac, cluster, description),
loadTopicConfigs(cluster),
@@ -64,7 +64,7 @@ private Mono<Statistics> getStatistics(KafkaCluster cluster) {
}

private Mono<InternalLogDirStats> getLogDirInfo(ClusterDescription desc, ReactiveAdminClient ac) {
var brokerIds = desc.getNodes().stream().map(Node::id).collect(Collectors.toSet());
var brokerIds = desc.nodes().stream().map(Node::id).collect(Collectors.toSet());
return ac.describeLogDirs(brokerIds).map(InternalLogDirStats::new);
}

6 changes: 3 additions & 3 deletions api/src/main/java/io/kafbat/ui/service/TopicsService.java
Original file line number Diff line number Diff line change
@@ -253,7 +253,7 @@ public Mono<ReplicationFactorChangeResponseDTO> changeReplicationFactor(
Integer actual = topic.getReplicationFactor();
Integer requested = replicationFactorChange.getTotalReplicationFactor();
Integer brokersCount = statisticsCache.get(cluster).getClusterDescription()
.getNodes().size();
.nodes().size();

if (requested.equals(actual)) {
return Mono.error(
@@ -361,14 +361,14 @@ private Map<Integer, List<Integer>> getCurrentAssignment(InternalTopic topic) {
.collect(toMap(
InternalPartition::getPartition,
p -> p.getReplicas().stream()
.map(InternalReplica::getBroker)
.map(InternalReplica::broker)
.collect(toList())
));
}

private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
Map<Integer, List<Integer>> currentAssignment) {
Map<Integer, Integer> result = statisticsCache.get(cluster).getClusterDescription().getNodes()
Map<Integer, Integer> result = statisticsCache.get(cluster).getClusterDescription().nodes()
.stream()
.map(Node::id)
.collect(toMap(
Original file line number Diff line number Diff line change
@@ -59,10 +59,7 @@ public Optional<JsonNode> getColumnValue(List<JsonNode> row, String column) {
}
}

@Value
private static class KsqlRequest {
String ksql;
Map<String, String> streamsProperties;
private record KsqlRequest(String ksql, Map<String, String> streamsProperties) {
}

//--------------------------------------------------------------------------------------------
@@ -172,7 +169,7 @@ public Flux<KsqlResponseTable> execute(String ksql, Map<String, String> streamPr
if (parsedStatements.isEmpty()) {
return errorTableFlux("Sql statement is invalid or unsupported");
}
var statements = parsedStatements.get().getStatements();
var statements = parsedStatements.get().statements();
if (statements.size() > 1) {
return errorTableFlux("Only single statement supported now");
}
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@
import ksql.KsqlGrammarLexer;
import ksql.KsqlGrammarParser;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.experimental.Delegate;
import org.antlr.v4.runtime.BaseErrorListener;
import org.antlr.v4.runtime.CharStream;
@@ -22,9 +21,7 @@ class KsqlGrammar {
private KsqlGrammar() {
}

@Value
static class KsqlStatements {
List<KsqlGrammarParser.SingleStatementContext> statements;
record KsqlStatements(List<KsqlGrammarParser.SingleStatementContext> statements) {
}

// returns Empty if no valid statements found
Original file line number Diff line number Diff line change
@@ -20,11 +20,7 @@
@Service
public class KsqlServiceV2 {

@lombok.Value
private static class KsqlExecuteCommand {
KafkaCluster cluster;
String ksql;
Map<String, String> streamProperties;
private record KsqlExecuteCommand(KafkaCluster cluster, String ksql, Map<String, String> streamProperties) {
}

private final Cache<String, KsqlExecuteCommand> registeredCommands =
Original file line number Diff line number Diff line change
@@ -22,15 +22,7 @@ public class DataMasking {

private static final JsonMapper JSON_MAPPER = new JsonMapper();

@Value
static class Mask {
@Nullable
Pattern topicKeysPattern;
@Nullable
Pattern topicValuesPattern;

MaskingPolicy policy;

record Mask(@Nullable Pattern topicKeysPattern, @Nullable Pattern topicValuesPattern, MaskingPolicy policy) {
boolean shouldBeApplied(String topic, Serde.Target target) {
return target == Serde.Target.KEY
? topicKeysPattern != null && topicKeysPattern.matcher(topic).matches()
24 changes: 1 addition & 23 deletions api/src/main/java/io/kafbat/ui/service/metrics/RawMetric.java
Original file line number Diff line number Diff line change
@@ -27,29 +27,7 @@ static RawMetric create(String name, Map<String, String> labels, BigDecimal valu
return new SimpleMetric(name, labels, value);
}

@AllArgsConstructor
@EqualsAndHashCode
@ToString
class SimpleMetric implements RawMetric {

private final String name;
private final Map<String, String> labels;
private final BigDecimal value;

@Override
public String name() {
return name;
}

@Override
public Map<String, String> labels() {
return labels;
}

@Override
public BigDecimal value() {
return value;
}
record SimpleMetric(String name, Map<String, String> labels, BigDecimal value) implements RawMetric {

@Override
public RawMetric copyWithValue(BigDecimal newValue) {
Original file line number Diff line number Diff line change
@@ -27,8 +27,8 @@ public JsonSchema convert(URI basePath, Schema schema) {

if (type.getType().equals(JsonType.Type.OBJECT)) {
final ObjectFieldSchema objectRoot = (ObjectFieldSchema) root;
builder.properties(objectRoot.getProperties());
builder.required(objectRoot.getRequired());
builder.properties(objectRoot.properties());
builder.required(objectRoot.required());
}

return builder.build();
Loading