From 7e29059850116c52695becb180650964ec013051 Mon Sep 17 00:00:00 2001 From: tchiotludo Date: Sat, 11 May 2019 14:36:21 +0200 Subject: [PATCH] Allow to hide internal & stream topic on topic list - close #30 --- README.md | 14 ++++++ application.example.yml | 14 ++++++ assets/modules/datas/filter.scss | 13 +++++ .../kafkahq/controllers/TopicController.java | 10 +++- src/main/java/org/kafkahq/models/Topic.java | 17 ++++++- .../kafkahq/repositories/TopicRepository.java | 47 +++++++++++++++++-- src/main/resources/application.yml | 13 +++++ .../resources/views/blocks/navbar-search.ftl | 36 ++++++++------ .../EmbeddedSingleNodeKafkaCluster.java | 2 - .../repositories/TopicRepositoryTest.java | 19 +++++++- src/test/resources/application.yml | 13 +++++ 11 files changed, 173 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 3b4b49194..3fef5a5b6 100644 --- a/README.md +++ b/README.md @@ -110,7 +110,21 @@ file example can be found here :[application.example.yml](application.example.ym * `ssl.key-store`: /app/truststore.jks * `ssl.key-store-password`: key-store-password +### KafkaHQ configuration +#### Topic List +* `kafkahq.topic.default-view` is default list view (ALL, HIDE_INTERNAL, HIDE_INTERNAL_STREAM, HIDE_STREAM) +* `kafkahq.topic.internal-regexps` is list of regexp to be considered as internal (internal topic can't be deleted or updated) +* `kafkahq.topic.stream-regexps` is list of regexp to be considered as internal stream topic + + +#### Topic Data +* `kafkahq.topic-data.sort`: default sort order (OLDEST, NEWEST) (default: OLDEST) +* `kafkahq.topic-data.size`: max record per page (default: 50) +* `kafkahq.topic-data.poll-timeout`: The time, in milliseconds, spent waiting in poll if data is not available in the + buffer (default: 1000). + + ### Security * `kafkahq.security.default-roles`: Roles available for all the user even unlogged user, roles available are : * `topic/read` diff --git a/application.example.yml b/application.example.yml index 90dd181f7..4b529308f 100644 --- a/application.example.yml +++ b/application.example.yml @@ -57,6 +57,20 @@ kafkahq: ssl.keystore.password: password ssl.key.password: password + # Topic list display options (optional) + topic: + default-view: HIDE_INTERNAL # default list view (ALL, HIDE_INTERNAL, HIDE_INTERNAL_STREAM, HIDE_STREAM) + internal-regexps: # list of regexp to be considered as internal (internal topic can't be deleted or updated) + - "^_.*$" + - "^.*_schemas$" + - "^.*connect-config$" + - "^.*connect-offsets$1" + - "^.*connect-status$" + stream-regexps: # list of regexp to be considered as internal stream topic + - "^.*-changelog$" + - "^.*-repartition$" + - "^.*-rekey$" + # Topic display data options (optional) topic-data: sort: OLDEST # default sort order (OLDEST, NEWEST) (default: OLDEST) diff --git a/assets/modules/datas/filter.scss b/assets/modules/datas/filter.scss index 06ea5d0a5..fd8c4ef06 100644 --- a/assets/modules/datas/filter.scss +++ b/assets/modules/datas/filter.scss @@ -9,4 +9,17 @@ order: 3; } } + + @include media-breakpoint-up(sm) { + select { + max-width: 200px; + } + } + + @include media-breakpoint-down(xs) { + button.btn-primary { + display: block; + width: 100%; + } + } } \ No newline at end of file diff --git a/src/main/java/org/kafkahq/controllers/TopicController.java b/src/main/java/org/kafkahq/controllers/TopicController.java index 4b53798ea..c939e9214 100644 --- a/src/main/java/org/kafkahq/controllers/TopicController.java +++ b/src/main/java/org/kafkahq/controllers/TopicController.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; +import io.micronaut.context.annotation.Value; import io.micronaut.context.env.Environment; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; @@ -44,6 +45,8 @@ public class TopicController extends AbstractController { private RecordRepository recordRepository; private FreemarkerViewsRenderer freemarkerViewsRenderer; private Environment environment; + @Value("${kafkahq.topic.default-view}") + private String defaultView; @Inject public TopicController(TopicRepository topicRepository, @@ -61,12 +64,15 @@ public TopicController(TopicRepository topicRepository, @View("topicList") @Get - public HttpResponse list(HttpRequest request, String cluster, Optional search) throws ExecutionException, InterruptedException { + public HttpResponse list(HttpRequest request, String cluster, Optional search, Optional show) throws ExecutionException, InterruptedException { + TopicRepository.TopicListView topicListView = show.orElse(TopicRepository.TopicListView.valueOf(defaultView)); + return this.template( request, cluster, "search", search, - "topics", this.topicRepository.list(search) + "topicListView", topicListView, + "topics", this.topicRepository.list(show.orElse(TopicRepository.TopicListView.valueOf(defaultView)), search) ); } diff --git a/src/main/java/org/kafkahq/models/Topic.java b/src/main/java/org/kafkahq/models/Topic.java index e0f31f8f1..714894952 100644 --- a/src/main/java/org/kafkahq/models/Topic.java +++ b/src/main/java/org/kafkahq/models/Topic.java @@ -20,6 +20,8 @@ public class Topic { private String name; private boolean internal; + private boolean configInternal; + private boolean configStream; private final List partitions = new ArrayList<>(); private List consumerGroups; @@ -27,12 +29,17 @@ public Topic( TopicDescription description, List consumerGroup, List logDirs, - List offsets + List offsets, + boolean configInternal, + boolean configStream ) { this.name = description.name(); this.internal = description.isInternal(); this.consumerGroups = consumerGroup; + this.configInternal = configInternal; + this.configStream = configStream; + for (TopicPartitionInfo partition : description.partitions()) { this.partitions.add(new Partition( description.name(), @@ -50,6 +57,14 @@ public Topic( } } + public boolean isInternal() { + return this.internal || this.configInternal; + } + + public boolean isStream() { + return this.configStream; + } + public List getReplicas() { return this.getPartitions().stream() .flatMap(partition -> partition.getNodes().stream()) diff --git a/src/main/java/org/kafkahq/repositories/TopicRepository.java b/src/main/java/org/kafkahq/repositories/TopicRepository.java index 563def0a9..4c005196e 100644 --- a/src/main/java/org/kafkahq/repositories/TopicRepository.java +++ b/src/main/java/org/kafkahq/repositories/TopicRepository.java @@ -1,5 +1,6 @@ package org.kafkahq.repositories; +import io.micronaut.context.annotation.Value; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.admin.TopicListing; @@ -19,6 +20,12 @@ public class TopicRepository extends AbstractRepository { private LogDirRepository logDirRepository; private ConfigRepository configRepository; + @Value("${kafkahq.topic.internal-regexps}") + protected List internalRegexps; + + @Value("${kafkahq.topic.stream-regexps}") + protected List streamRegexps; + @Inject public TopicRepository(KafkaModule kafkaModule, ConsumerGroupRepository consumerGroupRepository, LogDirRepository logDirRepository, ConfigRepository configRepository) { this.kafkaModule = kafkaModule; @@ -27,13 +34,20 @@ public TopicRepository(KafkaModule kafkaModule, ConsumerGroupRepository consumer this.configRepository = configRepository; } - public List list(Optional search) throws ExecutionException, InterruptedException { + public enum TopicListView { + ALL, + HIDE_INTERNAL, + HIDE_INTERNAL_STREAM, + HIDE_STREAM, + } + + public List list(TopicListView view, Optional search) throws ExecutionException, InterruptedException { ArrayList list = new ArrayList<>(); Collection listTopics = kafkaWrapper.listTopics(); for (TopicListing item : listTopics) { - if (isSearchMatch(search, item.name())) { + if (isSearchMatch(search, item.name()) && isListViewMatch(view, item.name())) { list.add(item.name()); } } @@ -44,6 +58,19 @@ public List list(Optional search) throws ExecutionException, Inte return topics; } + public boolean isListViewMatch(TopicListView view, String value) { + switch (view) { + case HIDE_STREAM: + return !isStream(value); + case HIDE_INTERNAL: + return !isInternal(value); + case HIDE_INTERNAL_STREAM: + return !isInternal(value) && !isStream(value); + } + + return true; + } + public Topic findByName(String name) throws ExecutionException, InterruptedException { Optional topics = this.findByName(Collections.singletonList(name)).stream().findFirst(); @@ -62,7 +89,9 @@ public List findByName(List topics) throws ExecutionException, In description.getValue(), consumerGroupRepository.findByTopic(description.getValue().name()), logDirRepository.findByTopic(description.getValue().name()), - topicOffsets.get(description.getValue().name()) + topicOffsets.get(description.getValue().name()), + isInternal(description.getValue().name()), + isStream(description.getValue().name()) ) ); } @@ -70,6 +99,18 @@ public List findByName(List topics) throws ExecutionException, In return list; } + private boolean isInternal(String name) { + return this.internalRegexps + .stream() + .anyMatch(name::matches); + } + + private boolean isStream(String name) { + return this.streamRegexps + .stream() + .anyMatch(name::matches); + } + public void create(String clusterId, String name, int partitions, short replicationFactor, List configs) throws ExecutionException, InterruptedException { kafkaModule .getAdminClient(clusterId) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 937f79693..35aba452c 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -50,6 +50,19 @@ kafkahq: group.id: KafkaHQ enable.auto.commit: "false" + topic: + default-view: HIDE_INTERNAL + internal-regexps: + - "^_.*$" + - "^.*_schemas$" + - "^.*connect-config$" + - "^.*connect-offsets$1" + - "^.*connect-status$" + stream-regexps: + - "^.*-changelog$" + - "^.*-repartition$" + - "^.*-rekey$" + topic-data: sort: OLDEST size: 50 diff --git a/src/main/resources/views/blocks/navbar-search.ftl b/src/main/resources/views/blocks/navbar-search.ftl index 3ee28bd00..3d5eb6e47 100644 --- a/src/main/resources/views/blocks/navbar-search.ftl +++ b/src/main/resources/views/blocks/navbar-search.ftl @@ -1,23 +1,29 @@ <#ftl output_format="HTML"> <#-- @ftlvariable name="search" type="java.util.Optional" --> +<#-- @ftlvariable name="topicListView" type="org.kafkahq.repositories.TopicRepository.TopicListView" --> \ No newline at end of file diff --git a/src/test/java/org/kafkahq/clusters/EmbeddedSingleNodeKafkaCluster.java b/src/test/java/org/kafkahq/clusters/EmbeddedSingleNodeKafkaCluster.java index 283426592..e988b921d 100644 --- a/src/test/java/org/kafkahq/clusters/EmbeddedSingleNodeKafkaCluster.java +++ b/src/test/java/org/kafkahq/clusters/EmbeddedSingleNodeKafkaCluster.java @@ -59,8 +59,6 @@ public void start() throws Exception { log.debug("Schema registry is running at {}", schemaRegistryUrl()); // connect - ServerSocket s = new ServerSocket(0); - Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServers()); properties.put("key.converter", "io.confluent.connect.avro.AvroConverter"); diff --git a/src/test/java/org/kafkahq/repositories/TopicRepositoryTest.java b/src/test/java/org/kafkahq/repositories/TopicRepositoryTest.java index d9aec2c84..c61b62978 100644 --- a/src/test/java/org/kafkahq/repositories/TopicRepositoryTest.java +++ b/src/test/java/org/kafkahq/repositories/TopicRepositoryTest.java @@ -26,12 +26,27 @@ public class TopicRepositoryTest { @Test public void list() throws ExecutionException, InterruptedException { - assertEquals(14, topicRepository.list(Optional.empty()).size()); + assertEquals(14, topicRepository.list(TopicRepository.TopicListView.ALL, Optional.empty()).size()); + } + + @Test + public void listNoInternal() throws ExecutionException, InterruptedException { + assertEquals(9, topicRepository.list(TopicRepository.TopicListView.HIDE_INTERNAL, Optional.empty()).size()); + } + + @Test + public void listNoInternalStream() throws ExecutionException, InterruptedException { + assertEquals(7, topicRepository.list(TopicRepository.TopicListView.HIDE_INTERNAL_STREAM, Optional.empty()).size()); + } + + @Test + public void listNoStream() throws ExecutionException, InterruptedException { + assertEquals(12, topicRepository.list(TopicRepository.TopicListView.HIDE_STREAM, Optional.empty()).size()); } @Test public void search() throws ExecutionException, InterruptedException { - assertEquals(1, topicRepository.list(Optional.of("ra do")).size()); + assertEquals(1, topicRepository.list(TopicRepository.TopicListView.ALL, Optional.of("ra do")).size()); } @Test diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index d6c469c93..260e4ac54 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -4,6 +4,19 @@ kafkahq: access-log: enabled: false + topic: + default-view: HIDE_INTERNAL + internal-regexps: + - "^_.*$" + - "^.*_schemas$" + - "^.*connect-config$" + - "^.*connect-offsets$1" + - "^.*connect-status$" + stream-regexps: + - "^.*-changelog$" + - "^.*-repartition$" + - "^.*-rekey$" + security: basic-auth: user: