diff --git a/src/main/java/org/akhq/modules/AbstractKafkaWrapper.java b/src/main/java/org/akhq/modules/AbstractKafkaWrapper.java index 3aaf3b62d..93d2e45f6 100644 --- a/src/main/java/org/akhq/modules/AbstractKafkaWrapper.java +++ b/src/main/java/org/akhq/modules/AbstractKafkaWrapper.java @@ -87,10 +87,17 @@ public Map describeTopics(String clusterId, List configs) throws ExecutionException { + Map kafkaTopicConfigs = new HashMap<>(); + + configs.forEach(c-> kafkaTopicConfigs.put(c.getName(), c.getValue())); + + NewTopic topic = new NewTopic(name, partitions, replicationFactor).configs(kafkaTopicConfigs); + Logger.call(kafkaModule .getAdminClient(clusterId) - .createTopics(Collections.singleton(new NewTopic(name, partitions, replicationFactor))) + .createTopics(Collections.singleton(topic)) .all(), "Create Topics", Collections.singletonList(name) diff --git a/src/main/java/org/akhq/repositories/TopicRepository.java b/src/main/java/org/akhq/repositories/TopicRepository.java index 8fc3f53c7..d293a5ade 100644 --- a/src/main/java/org/akhq/repositories/TopicRepository.java +++ b/src/main/java/org/akhq/repositories/TopicRepository.java @@ -137,9 +137,7 @@ private boolean isStream(String name) { } public void create(String clusterId, String name, int partitions, short replicationFactor, List configs) throws ExecutionException, InterruptedException { - kafkaWrapper.createTopics(clusterId, name, partitions, replicationFactor); - checkIfTopicExists(clusterId, name); - configRepository.updateTopic(clusterId, name, configs); + kafkaWrapper.createTopics(clusterId, name, partitions, replicationFactor, configs); } public void delete(String clusterId, String name) throws ExecutionException, InterruptedException { diff --git a/src/test/java/org/akhq/repositories/TopicRepositoryTest.java b/src/test/java/org/akhq/repositories/TopicRepositoryTest.java index 533617163..ce1da2a12 100644 --- a/src/test/java/org/akhq/repositories/TopicRepositoryTest.java +++ b/src/test/java/org/akhq/repositories/TopicRepositoryTest.java @@ -130,20 +130,30 @@ void findByNameWithTopicRegex() throws ExecutionException, InterruptedException @Test void create() throws ExecutionException, InterruptedException { - topicRepository.create(KafkaTestCluster.CLUSTER_ID, "create", 8, (short) 1, Collections.singletonList( + topicRepository.create(KafkaTestCluster.CLUSTER_ID, "createEmptyConfig", 8, (short) 1, Collections.emptyList() + ); + + assertEquals(8, topicRepository.findByName(KafkaTestCluster.CLUSTER_ID, "createEmptyConfig").getPartitions().size()); + + topicRepository.delete(KafkaTestCluster.CLUSTER_ID, "createEmptyConfig"); + } + + @Test + void createWithConfig() throws ExecutionException, InterruptedException { + topicRepository.create(KafkaTestCluster.CLUSTER_ID, "createWithConfig", 8, (short) 1, Collections.singletonList( new Config(TopicConfig.SEGMENT_MS_CONFIG, "1000") )); - Optional option = configRepository.findByTopic(KafkaTestCluster.CLUSTER_ID, "create") + Optional option = configRepository.findByTopic(KafkaTestCluster.CLUSTER_ID, "createWithConfig") .stream() .filter(r -> r.getName().equals(TopicConfig.SEGMENT_MS_CONFIG)) .findFirst() .map(Config::getValue); - assertEquals(8, topicRepository.findByName(KafkaTestCluster.CLUSTER_ID, "create").getPartitions().size()); + assertEquals(8, topicRepository.findByName(KafkaTestCluster.CLUSTER_ID, "createWithConfig").getPartitions().size()); assertEquals("1000", option.get()); - topicRepository.delete(KafkaTestCluster.CLUSTER_ID, "create"); + topicRepository.delete(KafkaTestCluster.CLUSTER_ID, "createWithConfig"); } @Test