Skip to content

Commit

Permalink
Merge branch 'main' into auth_page_be
Browse files Browse the repository at this point in the history
  • Loading branch information
Haarolean authored Dec 17, 2024
2 parents d91e8b9 + 4bb3632 commit 5ec0abe
Show file tree
Hide file tree
Showing 65 changed files with 320 additions and 207 deletions.
17 changes: 17 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,23 @@ updates:
- "type/dependencies"
- "scope/backend"

- package-ecosystem: docker
directory: "/api"
schedule:
interval: weekly
time: "10:00"
timezone: Europe/London
reviewers:
- "kafbat/backend"
open-pull-requests-limit: 10
ignore:
- dependency-name: "azul/zulu-openjdk-alpine"
# Limit dependabot pull requests to minor Java upgrades
update-types: ["version-update:semver-major"]
labels:
- "type/dependencies"
- "scope/backend"

- package-ecosystem: npm
directory: "/frontend"
schedule:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/backend_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/branch-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'
- name: Build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build-public-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'
- name: Build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cve_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/e2e-run.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'

Expand Down Expand Up @@ -72,7 +72,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/frontend_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:

- uses: pnpm/[email protected]
with:
version: 9.11.0
version: 9.15.0

- name: Install node
uses: actions/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release-serde-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: "17"
java-version: "21"
distribution: "zulu"
cache: "maven"

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/separate_env_public_create.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '17'
java-version: '21'
distribution: 'zulu'
cache: 'maven'
- name: Build
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Versatile, fast and lightweight web UI for managing Apache Kafka® clusters.

<p align="center">
<a href="https://ui.docs.kafbat.io/">Documentation</a> •
<a href="https://ui.docs.kafbat.io/configuration/quick-start">Quick Start</a> •
<a href="https://ui.docs.kafbat.io/quick-start/demo-run">Quick Start</a> •
<a href="https://discord.gg/4DWzD7pGE5">Community</a>
<br/>
<a href="https://aws.amazon.com/marketplace/pp/{replaceMe}">AWS Marketplace</a> •
Expand Down Expand Up @@ -50,7 +50,7 @@ We extend our gratitude to Provectus for their past support in groundbreaking wo
* **View Consumer Groups** — view per-partition parked offsets, combined and per-partition lag
* **Browse Messages** — browse messages with JSON, plain text, and Avro encoding
* **Dynamic Topic Configuration** — create and configure new topics with dynamic configuration
* **Configurable Authentification**[secure](https://ui.docs.kafbat.io/configuration/authentication) your installation with optional Github/Gitlab/Google OAuth 2.0
* **Configurable Authentication**[secure](https://ui.docs.kafbat.io/configuration/authentication) your installation with optional Github/Gitlab/Google OAuth 2.0
* **Custom serialization/deserialization plugins** - [use](https://ui.docs.kafbat.io/configuration/serialization-serde) a ready-to-go serde for your data like AWS Glue or Smile, or code your own!
* **Role based access control** - [manage permissions](https://ui.docs.kafbat.io/configuration/rbac-role-based-access-control) to access the UI with granular precision
* **Data masking** - [obfuscate](https://ui.docs.kafbat.io/configuration/data-masking) sensitive data in topic messages
Expand Down
5 changes: 4 additions & 1 deletion api/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
FROM azul/zulu-openjdk-alpine:17.0.11-jre-headless
# The tag is ignored when a sha is included but the reason to add it are:
# 1. Self Documentation: It is difficult to find out what the expected tag is given a sha alone
# 2. Helps dependabot during discovery of upgrades
FROM azul/zulu-openjdk-alpine:21.0.5-jre-headless@sha256:842b23baf96980437281b7419af970238b4c1c3109e50beac5299cdc278291d7

RUN apk add --no-cache \
# snappy codec
Expand Down
2 changes: 1 addition & 1 deletion api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
<version>1.13.3</version>
<version>1.14.2</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kafbat.ui.client;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.ApiClient;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
Expand All @@ -14,9 +15,11 @@
import io.kafbat.ui.exception.KafkaConnectConflictReponseException;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.util.WebClientConfigurator;
import jakarta.validation.constraints.NotNull;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
Expand Down Expand Up @@ -58,10 +61,24 @@ private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {

private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
return publisher
.onErrorResume(WebClientResponseException.BadRequest.class, e ->
Mono.error(new ValidationException("Invalid configuration")))
.onErrorResume(WebClientResponseException.InternalServerError.class, e ->
Mono.error(new ValidationException("Invalid configuration")));
.onErrorResume(WebClientResponseException.BadRequest.class,
RetryingKafkaConnectClient::parseConnectErrorMessage)
.onErrorResume(WebClientResponseException.InternalServerError.class,
RetryingKafkaConnectClient::parseConnectErrorMessage);
}

// Adapted from https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
// Adding the connect runtime dependency for this single class seems excessive
private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
}

private static <T> @NotNull Mono<T> parseConnectErrorMessage(WebClientResponseException parseException) {
final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class);
return Mono.error(new ValidationException(
Objects.requireNonNull(errorMessage,
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
"This should not happen according to the ConnectExceptionMapper")
.message()));
}

@Override
Expand Down Expand Up @@ -176,7 +193,7 @@ public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWith
}

@Override
public Flux<String> getConnectors(String search) throws WebClientResponseException {
public Mono<List<String>> getConnectors(String search) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectors(search));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.Map;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.security.oauth2.client.OAuth2ClientProperties;
Expand Down Expand Up @@ -41,7 +41,7 @@
@EnableWebFluxSecurity
@EnableReactiveMethodSecurity
@RequiredArgsConstructor
@Log4j2
@Slf4j
public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {

private final OAuthProperties properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(Strin
if (cursor != null) {
messagesFlux = messagesService.loadMessages(getCluster(clusterName), topicName, cursor);
} else {
var pollingMode = mode == null ? PollingModeDTO.LATEST : mode;
messagesFlux = messagesService.loadMessages(
getCluster(clusterName),
topicName,
ConsumerPosition.create(checkNotNull(mode), checkNotNull(topicName), partitions, timestamp, offset),
ConsumerPosition.create(pollingMode, checkNotNull(topicName), partitions, timestamp, offset),
stringFilter,
smartFilterId,
limit,
Expand Down
18 changes: 17 additions & 1 deletion api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,23 @@ public static Predicate<TopicMessageDTO> noop() {

public static Predicate<TopicMessageDTO> containsStringFilter(String string) {
return msg -> StringUtils.contains(msg.getKey(), string)
|| StringUtils.contains(msg.getContent(), string);
|| StringUtils.contains(msg.getContent(), string) || headersContains(msg, string);
}

private static boolean headersContains(TopicMessageDTO msg, String searchString) {
final var headers = msg.getHeaders();

if (headers == null) {
return false;
}

for (final var entry : headers.entrySet()) {
if (StringUtils.contains(entry.getKey(), searchString) || StringUtils.contains(entry.getValue(), searchString)) {
return true;
}
}

return false;
}

public static Predicate<TopicMessageDTO> celScriptFilter(String script) {
Expand Down
2 changes: 2 additions & 0 deletions api/src/main/java/io/kafbat/ui/model/rbac/Role.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kafbat.ui.model.rbac;

import com.google.common.base.Preconditions;
import java.util.List;
import lombok.Data;

Expand All @@ -12,6 +13,7 @@ public class Role {
List<Permission> permissions;

public void validate() {
Preconditions.checkArgument(!clusters.isEmpty(), "Role clusters cannot be empty");
permissions.forEach(Permission::transform);
permissions.forEach(Permission::validate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public ProducerRecord<byte[], byte[]> create(String topic,

private Iterable<Header> createHeaders(Map<String, String> clientHeaders) {
RecordHeaders headers = new RecordHeaders();
clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v.getBytes())));
clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v == null ? null : v.getBytes())));
return headers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ private Mono<List<ConsumerGroupDescription>> loadSortedDescriptions(ReactiveAdmi
case EMPTY -> 3;
case DEAD -> 4;
case UNKNOWN -> 5;
case ASSIGNING -> 6;
case RECONCILING -> 7;
};
var comparator = Comparator.comparingInt(statesPriorities);
yield loadDescriptionsByListings(ac, groups, comparator, pageNum, perPage, sortOrderDto);
Expand Down
36 changes: 8 additions & 28 deletions api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kafbat.ui.service;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
import io.kafbat.ui.connect.model.ConnectorStatus;
Expand Down Expand Up @@ -31,7 +30,6 @@
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -106,10 +104,7 @@ public Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String con

public Flux<String> getConnectorNames(KafkaCluster cluster, String connectName) {
return api(cluster, connectName)
.flux(client -> client.getConnectors(null))
// for some reason `getConnectors` method returns the response as a single string
.collectList().map(e -> e.get(0))
.map(this::parseConnectorsNamesStringToList)
.mono(client -> client.getConnectors(null))
.flatMapMany(Flux::fromIterable);
}

Expand All @@ -118,12 +113,6 @@ public Flux<String> getConnectorNamesWithErrorsSuppress(KafkaCluster cluster, St
return getConnectorNames(cluster, connectName).onErrorComplete();
}

@SneakyThrows
private List<String> parseConnectorsNamesStringToList(String json) {
return objectMapper.readValue(json, new TypeReference<>() {
});
}

public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectName,
Mono<NewConnectorDTO> connector) {
return api(cluster, connectName)
Expand Down Expand Up @@ -218,22 +207,13 @@ public Mono<Void> deleteConnector(
public Mono<Void> updateConnectorState(KafkaCluster cluster, String connectName,
String connectorName, ConnectorActionDTO action) {
return api(cluster, connectName)
.mono(client -> {
switch (action) {
case RESTART:
return client.restartConnector(connectorName, false, false);
case RESTART_ALL_TASKS:
return restartTasks(cluster, connectName, connectorName, task -> true);
case RESTART_FAILED_TASKS:
return restartTasks(cluster, connectName, connectorName,
t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED);
case PAUSE:
return client.pauseConnector(connectorName);
case RESUME:
return client.resumeConnector(connectorName);
default:
throw new IllegalStateException("Unexpected value: " + action);
}
.mono(client -> switch (action) {
case RESTART -> client.restartConnector(connectorName, false, false);
case RESTART_ALL_TASKS -> restartTasks(cluster, connectName, connectorName, task -> true);
case RESTART_FAILED_TASKS -> restartTasks(cluster, connectName, connectorName,
t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED);
case PAUSE -> client.pauseConnector(connectorName);
case RESUME -> client.resumeConnector(connectorName);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ private boolean isAccessible(AuthenticatedUser user, AccessContext context) {
return context.isAccessible(getUserPermissions(user, context.cluster()));
}

private List<Permission> getUserPermissions(AuthenticatedUser user, String clusterName) {
private List<Permission> getUserPermissions(AuthenticatedUser user, @Nullable String clusterName) {
return properties.getRoles()
.stream()
.filter(filterRole(user))
.filter(role -> role.getClusters().stream().anyMatch(clusterName::equalsIgnoreCase))
.filter(role -> clusterName == null || role.getClusters().stream().anyMatch(clusterName::equalsIgnoreCase))
.flatMap(role -> role.getPermissions().stream())
.toList();
}
Expand Down
Loading

0 comments on commit 5ec0abe

Please sign in to comment.