From 932e8855a4113912085ef5b62deefe23c331f29b Mon Sep 17 00:00:00 2001 From: James Cheng Date: Mon, 4 Feb 2019 01:51:58 -0800 Subject: [PATCH 1/7] Watch the zookeeper node for the topic we are deleting, and wait for it to disappear before we proceed. --- build.gradle | 2 + .../java/hello/TopicManagerController.java | 48 ++++++++++++++++--- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/build.gradle b/build.gradle index b0fc393..8baf346 100644 --- a/build.gradle +++ b/build.gradle @@ -34,5 +34,7 @@ dependencies { compile("org.apache.kafka:kafka-clients:2.1.0") compile("io.springfox:springfox-swagger2:2.9.2") compile("io.springfox:springfox-swagger-ui:2.9.2") + compile("org.apache.curator:curator-framework:4.1.0") + compile("org.apache.curator:curator-recipes:4.1.0") testCompile('org.springframework.boot:spring-boot-starter-test') } diff --git a/src/main/java/hello/TopicManagerController.java b/src/main/java/hello/TopicManagerController.java index a4b21b0..b689490 100644 --- a/src/main/java/hello/TopicManagerController.java +++ b/src/main/java/hello/TopicManagerController.java @@ -1,18 +1,24 @@ package hello; +import java.io.IOException; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Collection; import java.util.Collections; -import java.util.Map; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DeleteTopicsResult; -import org.apache.kafka.clients.admin.DescribeTopicsResult; -import org.apache.kafka.clients.admin.TopicDescription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; @@ -29,7 +35,7 @@ public class TopicManagerController { private static final Logger logger = LoggerFactory.getLogger(TopicManagerController.class); private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); - private static final int DELETE_INTERVAL_SECONDS = 60; + private static final int DELETE_INTERVAL_SECONDS = 3; private final LinkedBlockingQueue deleteQueue = new LinkedBlockingQueue(); @@ -50,7 +56,7 @@ public String queueDeleteTopic( } @Scheduled(fixedDelay = DELETE_INTERVAL_SECONDS * 1000) - public void deleteTopics() throws InterruptedException, ExecutionException { + public void deleteTopics() throws Exception { logger.info("Fixed Delay Task :: Execution Time - {}", dateTimeFormatter.format(LocalDateTime.now())); ScheduledTopicDelete scheduledDelete = deleteQueue.take(); @@ -58,10 +64,40 @@ public void deleteTopics() throws InterruptedException, ExecutionException { String broker = scheduledDelete.getBroker(); Properties adminClientProperties = new Properties(); adminClientProperties.put("bootstrap.servers", broker + ":9092"); - try (AdminClient client = AdminClient.create(adminClientProperties)) { + + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + + + + try (AdminClient client = AdminClient.create(adminClientProperties); + CuratorFramework zookeeperClient = CuratorFrameworkFactory.newClient(broker + ":2181", retryPolicy)) { + + zookeeperClient.start(); + zookeeperClient.blockUntilConnected(); + + CountDownLatch latch = new CountDownLatch(1); + final NodeCache nodeCache = new NodeCache(zookeeperClient, "/admin/delete_topics/" + topic); + nodeCache.getListenable().addListener(new NodeCacheListener() { + @Override + public void nodeChanged() throws Exception { + ChildData currentData = nodeCache.getCurrentData(); + if (currentData == null) { + logger.info("data change watched, and current data = null"); + latch.countDown(); + } else { + logger.info("node " + currentData.getPath() + " changed"); + } + } + }); + nodeCache.start(); + nodeCache.rebuild(); DeleteTopicsResult future = client.deleteTopics(Collections.singleton(topic)); Void result = future.all().get(); logger.info("deleted " + topic); + + latch.await(); + nodeCache.close(); + } } From 2f1b00ca298f8cda52cc65151b470be373fe6227 Mon Sep 17 00:00:00 2001 From: James Cheng Date: Mon, 4 Feb 2019 01:57:39 -0800 Subject: [PATCH 2/7] Do try-with-resources to close the NodeCache --- .../java/hello/TopicManagerController.java | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/main/java/hello/TopicManagerController.java b/src/main/java/hello/TopicManagerController.java index b689490..4184204 100644 --- a/src/main/java/hello/TopicManagerController.java +++ b/src/main/java/hello/TopicManagerController.java @@ -76,27 +76,27 @@ public void deleteTopics() throws Exception { zookeeperClient.blockUntilConnected(); CountDownLatch latch = new CountDownLatch(1); - final NodeCache nodeCache = new NodeCache(zookeeperClient, "/admin/delete_topics/" + topic); - nodeCache.getListenable().addListener(new NodeCacheListener() { - @Override - public void nodeChanged() throws Exception { - ChildData currentData = nodeCache.getCurrentData(); - if (currentData == null) { - logger.info("data change watched, and current data = null"); - latch.countDown(); - } else { - logger.info("node " + currentData.getPath() + " changed"); - } - } - }); - nodeCache.start(); - nodeCache.rebuild(); - DeleteTopicsResult future = client.deleteTopics(Collections.singleton(topic)); - Void result = future.all().get(); - logger.info("deleted " + topic); - - latch.await(); - nodeCache.close(); + try (NodeCache nodeCache = new NodeCache(zookeeperClient, "/admin/delete_topics/" + topic)) { + nodeCache.getListenable().addListener(new NodeCacheListener() { + @Override + public void nodeChanged() throws Exception { + ChildData currentData = nodeCache.getCurrentData(); + if (currentData == null) { + logger.info("data change watched, and current data = null"); + latch.countDown(); + } else { + logger.info("node " + currentData.getPath() + " changed"); + } + } + }); + nodeCache.start(); + nodeCache.rebuild(); + DeleteTopicsResult future = client.deleteTopics(Collections.singleton(topic)); + Void result = future.all().get(); + logger.info("deleted " + topic); + + latch.await(); + } } } From 30a613505142d06b433af1dd596300259c94addb Mon Sep 17 00:00:00 2001 From: James Cheng Date: Thu, 14 Feb 2019 22:52:52 -0800 Subject: [PATCH 3/7] Clean up logging. --- .../wushujames/topicmanager/TopicManagerController.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/wushujames/topicmanager/TopicManagerController.java b/src/main/java/com/wushujames/topicmanager/TopicManagerController.java index 6411e5d..3d17b8d 100644 --- a/src/main/java/com/wushujames/topicmanager/TopicManagerController.java +++ b/src/main/java/com/wushujames/topicmanager/TopicManagerController.java @@ -76,16 +76,15 @@ public void deleteTopics() throws Exception { zookeeperClient.blockUntilConnected(); CountDownLatch latch = new CountDownLatch(1); - try (NodeCache nodeCache = new NodeCache(zookeeperClient, "/admin/delete_topics/" + topic)) { + String zkpath = "/admin/delete_topics/" + topic; + try (NodeCache nodeCache = new NodeCache(zookeeperClient, zkpath)) { nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData currentData = nodeCache.getCurrentData(); if (currentData == null) { - logger.info("data change watched, and current data = null"); + logger.info("path " + zkpath + " disappeared, which means the topic has successfully been deleted"); latch.countDown(); - } else { - logger.info("node " + currentData.getPath() + " changed"); } } }); From 16d8edf4b39f2ec59c5ef509f28ed17ade718f28 Mon Sep 17 00:00:00 2001 From: James Cheng Date: Thu, 14 Feb 2019 22:53:29 -0800 Subject: [PATCH 4/7] We don't use this return value, so need to store it. --- .../com/wushujames/topicmanager/TopicManagerController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/wushujames/topicmanager/TopicManagerController.java b/src/main/java/com/wushujames/topicmanager/TopicManagerController.java index 3d17b8d..f9368a6 100644 --- a/src/main/java/com/wushujames/topicmanager/TopicManagerController.java +++ b/src/main/java/com/wushujames/topicmanager/TopicManagerController.java @@ -91,7 +91,7 @@ public void nodeChanged() throws Exception { nodeCache.start(); nodeCache.rebuild(); DeleteTopicsResult future = client.deleteTopics(Collections.singleton(topic)); - Void result = future.all().get(); + future.all().get(); logger.info("deleted " + topic); latch.await(); From e3089a3e1e55cd86ab8d85ab3fcc090a0e481633 Mon Sep 17 00:00:00 2001 From: James Cheng Date: Thu, 14 Feb 2019 22:54:35 -0800 Subject: [PATCH 5/7] Remove unused import. --- .../java/com/wushujames/topicmanager/TopicManagerController.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/wushujames/topicmanager/TopicManagerController.java b/src/main/java/com/wushujames/topicmanager/TopicManagerController.java index f9368a6..d18ab28 100644 --- a/src/main/java/com/wushujames/topicmanager/TopicManagerController.java +++ b/src/main/java/com/wushujames/topicmanager/TopicManagerController.java @@ -1,6 +1,5 @@ package com.wushujames.topicmanager; -import java.io.IOException; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Collection; From c56aeee07078ea9fc7a3bf259002411c6d6ce24d Mon Sep 17 00:00:00 2001 From: James Cheng Date: Thu, 14 Feb 2019 22:59:23 -0800 Subject: [PATCH 6/7] Switch from tabs to spaces --- .../topicmanager/TopicManagerController.java | 82 +++++++++---------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/src/main/java/com/wushujames/topicmanager/TopicManagerController.java b/src/main/java/com/wushujames/topicmanager/TopicManagerController.java index d18ab28..af4ad70 100644 --- a/src/main/java/com/wushujames/topicmanager/TopicManagerController.java +++ b/src/main/java/com/wushujames/topicmanager/TopicManagerController.java @@ -31,27 +31,27 @@ @RestController public class TopicManagerController { - private static final Logger logger = LoggerFactory.getLogger(TopicManagerController.class); + private static final Logger logger = LoggerFactory.getLogger(TopicManagerController.class); private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); private static final int DELETE_INTERVAL_SECONDS = 3; private final LinkedBlockingQueue deleteQueue = new LinkedBlockingQueue(); - + @RequestMapping(value = "/deletions", method = RequestMethod.GET) public Collection listDeletions() { - return deleteQueue; + return deleteQueue; } @RequestMapping(value="/broker/{broker}/topic/{topic}", method = RequestMethod.DELETE) @ApiOperation(value = "Queue a topic for deletion.", - notes = "Deletes will happen one at a time, every " + DELETE_INTERVAL_SECONDS + " seconds.") - public String queueDeleteTopic( - @ApiParam("Hostname of one of the brokers where this topic can be found") @PathVariable("broker") String broker, - @ApiParam("Topic to delete")@PathVariable("topic") String topic) throws InterruptedException, ExecutionException { - deleteQueue.add(new ScheduledTopicDelete(broker, topic)); - return "scheduled deletion for " + topic + " from broker " + broker; + notes = "Deletes will happen one at a time, every " + DELETE_INTERVAL_SECONDS + " seconds.") + public String queueDeleteTopic( + @ApiParam("Hostname of one of the brokers where this topic can be found") @PathVariable("broker") String broker, + @ApiParam("Topic to delete")@PathVariable("topic") String topic) throws InterruptedException, ExecutionException { + deleteQueue.add(new ScheduledTopicDelete(broker, topic)); + return "scheduled deletion for " + topic + " from broker " + broker; } @Scheduled(fixedDelay = DELETE_INTERVAL_SECONDS * 1000) @@ -60,44 +60,44 @@ public void deleteTopics() throws Exception { ScheduledTopicDelete scheduledDelete = deleteQueue.take(); String topic = scheduledDelete.getTopic(); - String broker = scheduledDelete.getBroker(); + String broker = scheduledDelete.getBroker(); Properties adminClientProperties = new Properties(); adminClientProperties.put("bootstrap.servers", broker + ":9092"); - + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); - - + + try (AdminClient client = AdminClient.create(adminClientProperties); - CuratorFramework zookeeperClient = CuratorFrameworkFactory.newClient(broker + ":2181", retryPolicy)) { - - zookeeperClient.start(); - zookeeperClient.blockUntilConnected(); - - CountDownLatch latch = new CountDownLatch(1); - String zkpath = "/admin/delete_topics/" + topic; - try (NodeCache nodeCache = new NodeCache(zookeeperClient, zkpath)) { - nodeCache.getListenable().addListener(new NodeCacheListener() { - @Override - public void nodeChanged() throws Exception { - ChildData currentData = nodeCache.getCurrentData(); - if (currentData == null) { - logger.info("path " + zkpath + " disappeared, which means the topic has successfully been deleted"); - latch.countDown(); - } - } - }); - nodeCache.start(); - nodeCache.rebuild(); - DeleteTopicsResult future = client.deleteTopics(Collections.singleton(topic)); - future.all().get(); - logger.info("deleted " + topic); - - latch.await(); - } - + CuratorFramework zookeeperClient = CuratorFrameworkFactory.newClient(broker + ":2181", retryPolicy)) { + + zookeeperClient.start(); + zookeeperClient.blockUntilConnected(); + + CountDownLatch latch = new CountDownLatch(1); + String zkpath = "/admin/delete_topics/" + topic; + try (NodeCache nodeCache = new NodeCache(zookeeperClient, zkpath)) { + nodeCache.getListenable().addListener(new NodeCacheListener() { + @Override + public void nodeChanged() throws Exception { + ChildData currentData = nodeCache.getCurrentData(); + if (currentData == null) { + logger.info("path " + zkpath + " disappeared, which means the topic has successfully been deleted"); + latch.countDown(); + } + } + }); + nodeCache.start(); + nodeCache.rebuild(); + DeleteTopicsResult future = client.deleteTopics(Collections.singleton(topic)); + future.all().get(); + logger.info("deleted " + topic); + + latch.await(); + } + } } - + } From 1f576b9fa13ec32d9082754a80bd4ff6b4a5bd0e Mon Sep 17 00:00:00 2001 From: James Cheng Date: Thu, 14 Feb 2019 23:00:36 -0800 Subject: [PATCH 7/7] Whitespace change --- .../com/wushujames/topicmanager/TopicManagerController.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/wushujames/topicmanager/TopicManagerController.java b/src/main/java/com/wushujames/topicmanager/TopicManagerController.java index af4ad70..f34268b 100644 --- a/src/main/java/com/wushujames/topicmanager/TopicManagerController.java +++ b/src/main/java/com/wushujames/topicmanager/TopicManagerController.java @@ -66,8 +66,6 @@ public void deleteTopics() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); - - try (AdminClient client = AdminClient.create(adminClientProperties); CuratorFramework zookeeperClient = CuratorFrameworkFactory.newClient(broker + ":2181", retryPolicy)) {