diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala index b54d68488e2..addf680d32e 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala @@ -68,19 +68,25 @@ object KafkaMessagingProvider extends MessagingProvider { val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava) def createTopic(retries: Int = 5): Try[Unit] = { - Try(client.createTopics(List(nt).asJava).values().get(topic).get()) - .map(_ => logging.info(this, s"created topic $topic")) - .recoverWith { - case CausedBy(_: TopicExistsException) => - Success(logging.info(this, s"topic $topic already existed")) - case CausedBy(t: RetriableException) if retries > 0 => - logging.warn(this, s"topic $topic could not be created because of $t, retries left: $retries") - Thread.sleep(1.second.toMillis) - createTopic(retries - 1) - case t => - logging.error(this, s"ensureTopic for $topic failed due to $t") - Failure(t) - } + Try(client.listTopics().names().get()) + .map(topics => + if (topics.contains(topic)) { + Success(logging.info(this, s"$topic already exists and the user can see it, skipping creation.")) + } else { + Try(client.createTopics(List(nt).asJava).values().get(topic).get()) + .map(_ => logging.info(this, s"created topic $topic")) + .recoverWith { + case CausedBy(_: TopicExistsException) => + Success(logging.info(this, s"topic $topic already existed")) + case CausedBy(t: RetriableException) if retries > 0 => + logging.warn(this, s"topic $topic could not be created because of $t, retries left: $retries") + Thread.sleep(1.second.toMillis) + createTopic(retries - 1) + case t => + logging.error(this, s"ensureTopic for $topic failed due to $t") + Failure(t) + } + }) } val result = createTopic()