diff --git a/kafka-admin-client/src/main/java/io/mosip/kafkaadminclient/MosipKafkaAdminClient.java b/kafka-admin-client/src/main/java/io/mosip/kafkaadminclient/MosipKafkaAdminClient.java index 8fb9741..f04a8ba 100644 --- a/kafka-admin-client/src/main/java/io/mosip/kafkaadminclient/MosipKafkaAdminClient.java +++ b/kafka-admin-client/src/main/java/io/mosip/kafkaadminclient/MosipKafkaAdminClient.java @@ -1,15 +1,21 @@ package io.mosip.kafkaadminclient; +import java.util.Arrays; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.errors.TopicExistsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,4 +41,25 @@ public void createTopic(String topicName) throws Exception { future.get(); } } + + public boolean isTopicsPresent(String topics) throws Exception { + List topicsList = Arrays.asList(topics.split(",")); + Set kafkaTopics = getAllTopics(); + return topicsList.stream().allMatch(kafkaTopics::contains); + } + + public Set getAllTopics() throws Exception { + try (Admin admin = Admin.create(properties)) { + ListTopicsOptions listTopicsOptions = new ListTopicsOptions(); + listTopicsOptions.listInternal(true); + return admin.listTopics(listTopicsOptions).names().get(); + } + } + + public Map describeTopic(String topic) throws Exception { + try (Admin admin = Admin.create(properties)) { + DescribeTopicsResult result = admin.describeTopics(Collections.singleton(topic)); + return result.all().get(); + } + } }