Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/bugfix/fix_nullable_enum_avro_se…
Browse files Browse the repository at this point in the history
…r' into bugfix/fix_nullable_enum_avro_ser
  • Loading branch information
DimaVilda committed Dec 22, 2024
2 parents 158104a + 2fce792 commit 9b27e8d
Show file tree
Hide file tree
Showing 61 changed files with 325 additions and 199 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/backend_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/branch-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'
- name: Build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build-public-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'
- name: Build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cve_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/e2e-run.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'

Expand Down Expand Up @@ -72,7 +72,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/frontend_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:

- uses: pnpm/[email protected]
with:
version: 9.11.0
version: 9.15.0

- name: Install node
uses: actions/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release-serde-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: "17"
java-version: "21"
distribution: "zulu"
cache: "maven"

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/separate_env_public_create.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'
- name: Build
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Versatile, fast and lightweight web UI for managing Apache Kafka® clusters.

<p align="center">
<a href="https://ui.docs.kafbat.io/">Documentation</a> •
<a href="https://ui.docs.kafbat.io/configuration/quick-start">Quick Start</a> •
<a href="https://ui.docs.kafbat.io/quick-start/demo-run">Quick Start</a> •
<a href="https://discord.gg/4DWzD7pGE5">Community</a>
<br/>
<a href="https://aws.amazon.com/marketplace/pp/{replaceMe}">AWS Marketplace</a> •
Expand Down
4 changes: 2 additions & 2 deletions api/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# The tag is ignored when a sha is included but the reason to add it are:
# The tag is ignored when a sha is included but the reason to add it are:
# 1. Self Documentation: It is difficult to find out what the expected tag is given a sha alone
# 2. Helps dependabot during discovery of upgrades
FROM azul/zulu-openjdk-alpine:17-jre-headless-latest@sha256:af4df00adaec356d092651af50d9e80fd179f96722d267e79acb564aede10fda
FROM azul/zulu-openjdk-alpine:21.0.5-jre-headless@sha256:842b23baf96980437281b7419af970238b4c1c3109e50beac5299cdc278291d7

RUN apk add --no-cache \
# snappy codec
Expand Down
2 changes: 1 addition & 1 deletion api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.13.3</version>
<version>1.14.2</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kafbat.ui.client;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.ApiClient;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
Expand All @@ -14,9 +15,11 @@
import io.kafbat.ui.exception.KafkaConnectConflictReponseException;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.util.WebClientConfigurator;
import jakarta.validation.constraints.NotNull;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
Expand Down Expand Up @@ -58,10 +61,24 @@ private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {

private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
return publisher
.onErrorResume(WebClientResponseException.BadRequest.class, e ->
Mono.error(new ValidationException("Invalid configuration")))
.onErrorResume(WebClientResponseException.InternalServerError.class, e ->
Mono.error(new ValidationException("Invalid configuration")));
.onErrorResume(WebClientResponseException.BadRequest.class,
RetryingKafkaConnectClient::parseConnectErrorMessage)
.onErrorResume(WebClientResponseException.InternalServerError.class,
RetryingKafkaConnectClient::parseConnectErrorMessage);
}

// Adapted from https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
// Adding the connect runtime dependency for this single class seems excessive
private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
}

private static <T> @NotNull Mono<T> parseConnectErrorMessage(WebClientResponseException parseException) {
final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class);
return Mono.error(new ValidationException(
Objects.requireNonNull(errorMessage,
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
"This should not happen according to the ConnectExceptionMapper")
.message()));
}

@Override
Expand Down Expand Up @@ -176,7 +193,7 @@ public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWith
}

@Override
public Flux<String> getConnectors(String search) throws WebClientResponseException {
public Mono<List<String>> getConnectors(String search) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectors(search));
}

Expand Down
18 changes: 17 additions & 1 deletion api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,23 @@ public static Predicate<TopicMessageDTO> noop() {

public static Predicate<TopicMessageDTO> containsStringFilter(String string) {
return msg -> StringUtils.contains(msg.getKey(), string)
|| StringUtils.contains(msg.getContent(), string);
|| StringUtils.contains(msg.getContent(), string) || headersContains(msg, string);
}

private static boolean headersContains(TopicMessageDTO msg, String searchString) {
final var headers = msg.getHeaders();

if (headers == null) {
return false;
}

for (final var entry : headers.entrySet()) {
if (StringUtils.contains(entry.getKey(), searchString) || StringUtils.contains(entry.getValue(), searchString)) {
return true;
}
}

return false;
}

public static Predicate<TopicMessageDTO> celScriptFilter(String script) {
Expand Down
2 changes: 2 additions & 0 deletions api/src/main/java/io/kafbat/ui/model/rbac/Role.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kafbat.ui.model.rbac;

import com.google.common.base.Preconditions;
import java.util.List;
import lombok.Data;

Expand All @@ -12,6 +13,7 @@ public class Role {
List<Permission> permissions;

public void validate() {
Preconditions.checkArgument(!clusters.isEmpty(), "Role clusters cannot be empty");
permissions.forEach(Permission::transform);
permissions.forEach(Permission::validate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.protobuf.StructProto;
import com.google.protobuf.TimestampProto;
import com.google.protobuf.TypeProto;
import com.google.protobuf.TypeRegistry;
import com.google.protobuf.WrappersProto;
import com.google.protobuf.util.JsonFormat;
import com.google.type.ColorProto;
Expand Down Expand Up @@ -147,12 +148,18 @@ public boolean canSerialize(String topic, Serde.Target type) {
@Override
public Serde.Serializer serializer(String topic, Serde.Target type) {
var descriptor = descriptorFor(topic, type).orElseThrow();
TypeRegistry typeRegistry = TypeRegistry.newBuilder()
.add(descriptorPaths.keySet())
.build();

return new Serde.Serializer() {
@SneakyThrows
@Override
public byte[] serialize(String input) {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
JsonFormat.parser().merge(input, builder);
JsonFormat.parser()
.usingTypeRegistry(typeRegistry)
.merge(input, builder);
return builder.build().toByteArray();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ private Mono<List<ConsumerGroupDescription>> loadSortedDescriptions(ReactiveAdmi
case EMPTY -> 3;
case DEAD -> 4;
case UNKNOWN -> 5;
case ASSIGNING -> 6;
case RECONCILING -> 7;
};
var comparator = Comparator.comparingInt(statesPriorities);
yield loadDescriptionsByListings(ac, groups, comparator, pageNum, perPage, sortOrderDto);
Expand Down
36 changes: 8 additions & 28 deletions api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kafbat.ui.service;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
import io.kafbat.ui.connect.model.ConnectorStatus;
Expand Down Expand Up @@ -31,7 +30,6 @@
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -106,10 +104,7 @@ public Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String con

public Flux<String> getConnectorNames(KafkaCluster cluster, String connectName) {
return api(cluster, connectName)
.flux(client -> client.getConnectors(null))
// for some reason `getConnectors` method returns the response as a single string
.collectList().map(e -> e.get(0))
.map(this::parseConnectorsNamesStringToList)
.mono(client -> client.getConnectors(null))
.flatMapMany(Flux::fromIterable);
}

Expand All @@ -118,12 +113,6 @@ public Flux<String> getConnectorNamesWithErrorsSuppress(KafkaCluster cluster, St
return getConnectorNames(cluster, connectName).onErrorComplete();
}

@SneakyThrows
private List<String> parseConnectorsNamesStringToList(String json) {
return objectMapper.readValue(json, new TypeReference<>() {
});
}

public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectName,
Mono<NewConnectorDTO> connector) {
return api(cluster, connectName)
Expand Down Expand Up @@ -218,22 +207,13 @@ public Mono<Void> deleteConnector(
public Mono<Void> updateConnectorState(KafkaCluster cluster, String connectName,
String connectorName, ConnectorActionDTO action) {
return api(cluster, connectName)
.mono(client -> {
switch (action) {
case RESTART:
return client.restartConnector(connectorName, false, false);
case RESTART_ALL_TASKS:
return restartTasks(cluster, connectName, connectorName, task -> true);
case RESTART_FAILED_TASKS:
return restartTasks(cluster, connectName, connectorName,
t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED);
case PAUSE:
return client.pauseConnector(connectorName);
case RESUME:
return client.resumeConnector(connectorName);
default:
throw new IllegalStateException("Unexpected value: " + action);
}
.mono(client -> switch (action) {
case RESTART -> client.restartConnector(connectorName, false, false);
case RESTART_ALL_TASKS -> restartTasks(cluster, connectName, connectorName, task -> true);
case RESTART_FAILED_TASKS -> restartTasks(cluster, connectName, connectorName,
t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED);
case PAUSE -> client.pauseConnector(connectorName);
case RESUME -> client.resumeConnector(connectorName);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ private boolean isAccessible(AuthenticatedUser user, AccessContext context) {
return context.isAccessible(getUserPermissions(user, context.cluster()));
}

private List<Permission> getUserPermissions(AuthenticatedUser user, String clusterName) {
private List<Permission> getUserPermissions(AuthenticatedUser user, @Nullable String clusterName) {
return properties.getRoles()
.stream()
.filter(filterRole(user))
.filter(role -> role.getClusters().stream().anyMatch(clusterName::equalsIgnoreCase))
.filter(role -> clusterName == null || role.getClusters().stream().anyMatch(clusterName::equalsIgnoreCase))
.flatMap(role -> role.getPermissions().stream())
.toList();
}
Expand Down
14 changes: 14 additions & 0 deletions api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kafbat.ui.container.KafkaConnectContainer;
import io.kafbat.ui.container.KsqlDbContainer;
import io.kafbat.ui.container.SchemaRegistryContainer;
import java.io.FileNotFoundException;
import java.nio.file.Path;
import java.util.List;
import java.util.Properties;
Expand All @@ -22,6 +23,7 @@
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.util.TestSocketUtils;
import org.springframework.util.ResourceUtils;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
Expand Down Expand Up @@ -75,6 +77,18 @@ public static class Initializer
public void initialize(@NotNull ConfigurableApplicationContext context) {
System.setProperty("kafka.clusters.0.name", LOCAL);
System.setProperty("kafka.clusters.0.bootstrapServers", kafka.getBootstrapServers());

// Add ProtobufFileSerde configuration
System.setProperty("kafka.clusters.0.serde.0.name", "ProtobufFile");
System.setProperty("kafka.clusters.0.serde.0.topicValuesPattern", "masking-test-.*");
try {
System.setProperty("kafka.clusters.0.serde.0.properties.protobufFilesDir",
ResourceUtils.getFile("classpath:protobuf-serde").getAbsolutePath());
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
System.setProperty("kafka.clusters.0.serde.0.properties.protobufMessageName", "test.MessageWithAny");

// List unavailable hosts to verify failover
System.setProperty("kafka.clusters.0.schemaRegistry",
String.format("http://localhost:%1$s,http://localhost:%1$s,%2$s",
Expand Down
Loading

0 comments on commit 9b27e8d

Please sign in to comment.