From 18751156b20887447d0341d7c35b365e90700a7c Mon Sep 17 00:00:00 2001 From: dan mcweeney Date: Thu, 23 Jan 2020 13:16:11 -0500 Subject: [PATCH] Check to see if the user can see the topic before creating it, this allows lower privilege users to be set for the controller and invoker. --- .../kafka/KafkaMessagingProvider.scala | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala index b54d68488e2..addf680d32e 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala @@ -68,19 +68,25 @@ object KafkaMessagingProvider extends MessagingProvider { val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava) def createTopic(retries: Int = 5): Try[Unit] = { - Try(client.createTopics(List(nt).asJava).values().get(topic).get()) - .map(_ => logging.info(this, s"created topic $topic")) - .recoverWith { - case CausedBy(_: TopicExistsException) => - Success(logging.info(this, s"topic $topic already existed")) - case CausedBy(t: RetriableException) if retries > 0 => - logging.warn(this, s"topic $topic could not be created because of $t, retries left: $retries") - Thread.sleep(1.second.toMillis) - createTopic(retries - 1) - case t => - logging.error(this, s"ensureTopic for $topic failed due to $t") - Failure(t) - } + Try(client.listTopics().names().get()) + .map(topics => + if (topics.contains(topic)) { + Success(logging.info(this, s"$topic already exists and the user can see it, skipping creation.")) + } else { + Try(client.createTopics(List(nt).asJava).values().get(topic).get()) + .map(_ => logging.info(this, s"created topic $topic")) + .recoverWith { + case CausedBy(_: TopicExistsException) => + Success(logging.info(this, s"topic $topic already existed")) + case CausedBy(t: RetriableException) if retries > 0 => + logging.warn(this, s"topic $topic could not be created because of $t, retries left: $retries") + Thread.sleep(1.second.toMillis) + createTopic(retries - 1) + case t => + logging.error(this, s"ensureTopic for $topic failed due to $t") + Failure(t) + } + }) } val result = createTopic()