Skip to content

Commit

Permalink
Allow to hide internal & stream topic on topic list
Browse files Browse the repository at this point in the history
- close #30
  • Loading branch information
tchiotludo committed May 11, 2019
1 parent 3c6cf23 commit 7e29059
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 25 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,21 @@ file example can be found here :[application.example.yml](application.example.ym
* `ssl.key-store`: /app/truststore.jks
* `ssl.key-store-password`: key-store-password

### KafkaHQ configuration

#### Topic List
* `kafkahq.topic.default-view` is default list view (ALL, HIDE_INTERNAL, HIDE_INTERNAL_STREAM, HIDE_STREAM)
* `kafkahq.topic.internal-regexps` is list of regexp to be considered as internal (internal topic can't be deleted or updated)
* `kafkahq.topic.stream-regexps` is list of regexp to be considered as internal stream topic


#### Topic Data
* `kafkahq.topic-data.sort`: default sort order (OLDEST, NEWEST) (default: OLDEST)
* `kafkahq.topic-data.size`: max record per page (default: 50)
* `kafkahq.topic-data.poll-timeout`: The time, in milliseconds, spent waiting in poll if data is not available in the
buffer (default: 1000).


### Security
* `kafkahq.security.default-roles`: Roles available for all the user even unlogged user, roles available are :
* `topic/read`
Expand Down
14 changes: 14 additions & 0 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ kafkahq:
ssl.keystore.password: password
ssl.key.password: password

# Topic list display options (optional)
topic:
default-view: HIDE_INTERNAL # default list view (ALL, HIDE_INTERNAL, HIDE_INTERNAL_STREAM, HIDE_STREAM)
internal-regexps: # list of regexp to be considered as internal (internal topic can't be deleted or updated)
- "^_.*$"
- "^.*_schemas$"
- "^.*connect-config$"
- "^.*connect-offsets$1"
- "^.*connect-status$"
stream-regexps: # list of regexp to be considered as internal stream topic
- "^.*-changelog$"
- "^.*-repartition$"
- "^.*-rekey$"

# Topic display data options (optional)
topic-data:
sort: OLDEST # default sort order (OLDEST, NEWEST) (default: OLDEST)
Expand Down
13 changes: 13 additions & 0 deletions assets/modules/datas/filter.scss
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,17 @@
order: 3;
}
}

@include media-breakpoint-up(sm) {
select {
max-width: 200px;
}
}

@include media-breakpoint-down(xs) {
button.btn-primary {
display: block;
width: 100%;
}
}
}
10 changes: 8 additions & 2 deletions src/main/java/org/kafkahq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.env.Environment;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
Expand Down Expand Up @@ -44,6 +45,8 @@ public class TopicController extends AbstractController {
private RecordRepository recordRepository;
private FreemarkerViewsRenderer freemarkerViewsRenderer;
private Environment environment;
@Value("${kafkahq.topic.default-view}")
private String defaultView;

@Inject
public TopicController(TopicRepository topicRepository,
Expand All @@ -61,12 +64,15 @@ public TopicController(TopicRepository topicRepository,

@View("topicList")
@Get
public HttpResponse list(HttpRequest request, String cluster, Optional<String> search) throws ExecutionException, InterruptedException {
public HttpResponse list(HttpRequest request, String cluster, Optional<String> search, Optional<TopicRepository.TopicListView> show) throws ExecutionException, InterruptedException {
TopicRepository.TopicListView topicListView = show.orElse(TopicRepository.TopicListView.valueOf(defaultView));

return this.template(
request,
cluster,
"search", search,
"topics", this.topicRepository.list(search)
"topicListView", topicListView,
"topics", this.topicRepository.list(show.orElse(TopicRepository.TopicListView.valueOf(defaultView)), search)
);
}

Expand Down
17 changes: 16 additions & 1 deletion src/main/java/org/kafkahq/models/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,26 @@
public class Topic {
private String name;
private boolean internal;
private boolean configInternal;
private boolean configStream;
private final List<Partition> partitions = new ArrayList<>();
private List<ConsumerGroup> consumerGroups;

public Topic(
TopicDescription description,
List<ConsumerGroup> consumerGroup,
List<LogDir> logDirs,
List<Partition.Offsets> offsets
List<Partition.Offsets> offsets,
boolean configInternal,
boolean configStream
) {
this.name = description.name();
this.internal = description.isInternal();
this.consumerGroups = consumerGroup;

this.configInternal = configInternal;
this.configStream = configStream;

for (TopicPartitionInfo partition : description.partitions()) {
this.partitions.add(new Partition(
description.name(),
Expand All @@ -50,6 +57,14 @@ public Topic(
}
}

public boolean isInternal() {
return this.internal || this.configInternal;
}

public boolean isStream() {
return this.configStream;
}

public List<Node.Partition> getReplicas() {
return this.getPartitions().stream()
.flatMap(partition -> partition.getNodes().stream())
Expand Down
47 changes: 44 additions & 3 deletions src/main/java/org/kafkahq/repositories/TopicRepository.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.kafkahq.repositories;

import io.micronaut.context.annotation.Value;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
Expand All @@ -19,6 +20,12 @@ public class TopicRepository extends AbstractRepository {
private LogDirRepository logDirRepository;
private ConfigRepository configRepository;

@Value("${kafkahq.topic.internal-regexps}")
protected List<String> internalRegexps;

@Value("${kafkahq.topic.stream-regexps}")
protected List<String> streamRegexps;

@Inject
public TopicRepository(KafkaModule kafkaModule, ConsumerGroupRepository consumerGroupRepository, LogDirRepository logDirRepository, ConfigRepository configRepository) {
this.kafkaModule = kafkaModule;
Expand All @@ -27,13 +34,20 @@ public TopicRepository(KafkaModule kafkaModule, ConsumerGroupRepository consumer
this.configRepository = configRepository;
}

public List<Topic> list(Optional<String> search) throws ExecutionException, InterruptedException {
public enum TopicListView {
ALL,
HIDE_INTERNAL,
HIDE_INTERNAL_STREAM,
HIDE_STREAM,
}

public List<Topic> list(TopicListView view, Optional<String> search) throws ExecutionException, InterruptedException {
ArrayList<String> list = new ArrayList<>();

Collection<TopicListing> listTopics = kafkaWrapper.listTopics();

for (TopicListing item : listTopics) {
if (isSearchMatch(search, item.name())) {
if (isSearchMatch(search, item.name()) && isListViewMatch(view, item.name())) {
list.add(item.name());
}
}
Expand All @@ -44,6 +58,19 @@ public List<Topic> list(Optional<String> search) throws ExecutionException, Inte
return topics;
}

public boolean isListViewMatch(TopicListView view, String value) {
switch (view) {
case HIDE_STREAM:
return !isStream(value);
case HIDE_INTERNAL:
return !isInternal(value);
case HIDE_INTERNAL_STREAM:
return !isInternal(value) && !isStream(value);
}

return true;
}

public Topic findByName(String name) throws ExecutionException, InterruptedException {
Optional<Topic> topics = this.findByName(Collections.singletonList(name)).stream().findFirst();

Expand All @@ -62,14 +89,28 @@ public List<Topic> findByName(List<String> topics) throws ExecutionException, In
description.getValue(),
consumerGroupRepository.findByTopic(description.getValue().name()),
logDirRepository.findByTopic(description.getValue().name()),
topicOffsets.get(description.getValue().name())
topicOffsets.get(description.getValue().name()),
isInternal(description.getValue().name()),
isStream(description.getValue().name())
)
);
}

return list;
}

private boolean isInternal(String name) {
return this.internalRegexps
.stream()
.anyMatch(name::matches);
}

private boolean isStream(String name) {
return this.streamRegexps
.stream()
.anyMatch(name::matches);
}

public void create(String clusterId, String name, int partitions, short replicationFactor, List<org.kafkahq.models.Config> configs) throws ExecutionException, InterruptedException {
kafkaModule
.getAdminClient(clusterId)
Expand Down
13 changes: 13 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,19 @@ kafkahq:
group.id: KafkaHQ
enable.auto.commit: "false"

topic:
default-view: HIDE_INTERNAL
internal-regexps:
- "^_.*$"
- "^.*_schemas$"
- "^.*connect-config$"
- "^.*connect-offsets$1"
- "^.*connect-status$"
stream-regexps:
- "^.*-changelog$"
- "^.*-repartition$"
- "^.*-rekey$"

topic-data:
sort: OLDEST
size: 50
Expand Down
36 changes: 21 additions & 15 deletions src/main/resources/views/blocks/navbar-search.ftl
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
<#ftl output_format="HTML">

<#-- @ftlvariable name="search" type="java.util.Optional<java.lang.String>" -->
<#-- @ftlvariable name="topicListView" type="org.kafkahq.repositories.TopicRepository.TopicListView" -->

<nav class="navbar navbar-expand-lg navbar-light bg-light mr-auto khq-data-filter">
<form class="form-inline khq-form-get" method="get">
<div class="input-group">
<input class="form-control"
name="search"
placeholder="Search"
autocomplete="off"
type="text"
<#if search.isPresent()>
value="${search.get()}"
</#if> />
<div class="input-group-append">
<button class="btn btn-primary" type="button">
<i class="fa fa-search"></i>
</button>
</div>
</div>
<input class="form-control"
name="search"
placeholder="Search"
autocomplete="off"
type="text"
<#if search.isPresent()>
value="${search.get()}"
</#if> />
<#if topicListView??>
<select name="show" class="custom-select ml-sm-2 mt-2 mt-sm-0">
<option ${(topicListView.toString() == "ALL")?then("selected", "")} value="ALL">Show all topics</option>
<option ${(topicListView.toString() == "HIDE_INTERNAL")?then("selected", "")} value="HIDE_INTERNAL">Hide internal topics</option>
<option ${(topicListView.toString() == "HIDE_INTERNAL_STREAM")?then("selected", "")} value="HIDE_INTERNAL_STREAM">Hide internal & stream topics</option>
<option ${(topicListView.toString() == "HIDE_STREAM")?then("selected", "")} value="HIDE_STREAM">Hide stream topics</option>
</select>
</#if>

<button class="btn btn-primary ml-sm-2 mt-2 mt-sm-0" type="submit">
<span class="d-sm-none">Search </span><i class="fa fa-search"></i>
</button>
</form>
</nav>
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public void start() throws Exception {
log.debug("Schema registry is running at {}", schemaRegistryUrl());

// connect
ServerSocket s = new ServerSocket(0);

Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers());
properties.put("key.converter", "io.confluent.connect.avro.AvroConverter");
Expand Down
19 changes: 17 additions & 2 deletions src/test/java/org/kafkahq/repositories/TopicRepositoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,27 @@ public class TopicRepositoryTest {

@Test
public void list() throws ExecutionException, InterruptedException {
assertEquals(14, topicRepository.list(Optional.empty()).size());
assertEquals(14, topicRepository.list(TopicRepository.TopicListView.ALL, Optional.empty()).size());
}

@Test
public void listNoInternal() throws ExecutionException, InterruptedException {
assertEquals(9, topicRepository.list(TopicRepository.TopicListView.HIDE_INTERNAL, Optional.empty()).size());
}

@Test
public void listNoInternalStream() throws ExecutionException, InterruptedException {
assertEquals(7, topicRepository.list(TopicRepository.TopicListView.HIDE_INTERNAL_STREAM, Optional.empty()).size());
}

@Test
public void listNoStream() throws ExecutionException, InterruptedException {
assertEquals(12, topicRepository.list(TopicRepository.TopicListView.HIDE_STREAM, Optional.empty()).size());
}

@Test
public void search() throws ExecutionException, InterruptedException {
assertEquals(1, topicRepository.list(Optional.of("ra do")).size());
assertEquals(1, topicRepository.list(TopicRepository.TopicListView.ALL, Optional.of("ra do")).size());
}

@Test
Expand Down
13 changes: 13 additions & 0 deletions src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ kafkahq:
access-log:
enabled: false

topic:
default-view: HIDE_INTERNAL
internal-regexps:
- "^_.*$"
- "^.*_schemas$"
- "^.*connect-config$"
- "^.*connect-offsets$1"
- "^.*connect-status$"
stream-regexps:
- "^.*-changelog$"
- "^.*-repartition$"
- "^.*-rekey$"

security:
basic-auth:
user:
Expand Down

0 comments on commit 7e29059

Please sign in to comment.