Skip to content

Commit

Permalink
Merge pull request #1 from wushujames/watch-zookeeper-new
Browse files Browse the repository at this point in the history
Wait until a topic is deleted before proceeding onto the next one. Do this by watching the znode and waiting for it to disappear.
  • Loading branch information
wushujames authored Feb 15, 2019
2 parents 4644519 + 1f576b9 commit 7b34a1c
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 20 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,7 @@ dependencies {
compile("org.apache.kafka:kafka-clients:2.1.0")
compile("io.springfox:springfox-swagger2:2.9.2")
compile("io.springfox:springfox-swagger-ui:2.9.2")
compile("org.apache.curator:curator-framework:4.1.0")
compile("org.apache.curator:curator-recipes:4.1.0")
testCompile('org.springframework.boot:spring-boot-starter-test')
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
Expand All @@ -26,44 +31,71 @@

@RestController
public class TopicManagerController {
private static final Logger logger = LoggerFactory.getLogger(TopicManagerController.class);
private static final Logger logger = LoggerFactory.getLogger(TopicManagerController.class);
private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");

private static final int DELETE_INTERVAL_SECONDS = 60;
private static final int DELETE_INTERVAL_SECONDS = 3;

private final LinkedBlockingQueue<ScheduledTopicDelete> deleteQueue = new LinkedBlockingQueue<ScheduledTopicDelete>();


@RequestMapping(value = "/deletions", method = RequestMethod.GET)
public Collection<ScheduledTopicDelete> listDeletions() {
return deleteQueue;
return deleteQueue;
}

@RequestMapping(value="/broker/{broker}/topic/{topic}", method = RequestMethod.DELETE)
@ApiOperation(value = "Queue a topic for deletion.",
notes = "Deletes will happen one at a time, every " + DELETE_INTERVAL_SECONDS + " seconds.")
public String queueDeleteTopic(
@ApiParam("Hostname of one of the brokers where this topic can be found") @PathVariable("broker") String broker,
@ApiParam("Topic to delete")@PathVariable("topic") String topic) throws InterruptedException, ExecutionException {
deleteQueue.add(new ScheduledTopicDelete(broker, topic));
return "scheduled deletion for " + topic + " from broker " + broker;
notes = "Deletes will happen one at a time, every " + DELETE_INTERVAL_SECONDS + " seconds.")
public String queueDeleteTopic(
@ApiParam("Hostname of one of the brokers where this topic can be found") @PathVariable("broker") String broker,
@ApiParam("Topic to delete")@PathVariable("topic") String topic) throws InterruptedException, ExecutionException {
deleteQueue.add(new ScheduledTopicDelete(broker, topic));
return "scheduled deletion for " + topic + " from broker " + broker;
}

@Scheduled(fixedDelay = DELETE_INTERVAL_SECONDS * 1000)
public void deleteTopics() throws InterruptedException, ExecutionException {
public void deleteTopics() throws Exception {
logger.info("Fixed Delay Task :: Execution Time - {}", dateTimeFormatter.format(LocalDateTime.now()));

ScheduledTopicDelete scheduledDelete = deleteQueue.take();
String topic = scheduledDelete.getTopic();
String broker = scheduledDelete.getBroker();
String broker = scheduledDelete.getBroker();
Properties adminClientProperties = new Properties();
adminClientProperties.put("bootstrap.servers", broker + ":9092");
try (AdminClient client = AdminClient.create(adminClientProperties)) {
DeleteTopicsResult future = client.deleteTopics(Collections.singleton(topic));
Void result = future.all().get();
logger.info("deleted " + topic);

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

try (AdminClient client = AdminClient.create(adminClientProperties);
CuratorFramework zookeeperClient = CuratorFrameworkFactory.newClient(broker + ":2181", retryPolicy)) {

zookeeperClient.start();
zookeeperClient.blockUntilConnected();

CountDownLatch latch = new CountDownLatch(1);
String zkpath = "/admin/delete_topics/" + topic;
try (NodeCache nodeCache = new NodeCache(zookeeperClient, zkpath)) {
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData currentData = nodeCache.getCurrentData();
if (currentData == null) {
logger.info("path " + zkpath + " disappeared, which means the topic has successfully been deleted");
latch.countDown();
}
}
});
nodeCache.start();
nodeCache.rebuild();
DeleteTopicsResult future = client.deleteTopics(Collections.singleton(topic));
future.all().get();
logger.info("deleted " + topic);

latch.await();
}

}
}

}

0 comments on commit 7b34a1c

Please sign in to comment.