Skip to content

Commit

Permalink
Kafka 0.10.2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
mkalen committed Jun 5, 2019
1 parent 16eca46 commit 99683a5
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 53 deletions.
16 changes: 8 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
<spring.boot.version>1.3.6.RELEASE</spring.boot.version>
<additionalparam>-Xdoclint:none</additionalparam>
<curator.version>2.10.0</curator.version>
<kafka.version>0.10.2.2</kafka.version>

<!-- name of parameter changed in latest mvn javadoc plugin version-->
<additionalOptions>-Xdoclint:none</additionalOptions>

</properties>

<scm>
Expand Down Expand Up @@ -71,8 +71,13 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.2.2</version>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.freemarker</groupId>
Expand All @@ -94,11 +99,6 @@
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.2</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down
28 changes: 26 additions & 2 deletions src/main/java/com/homeadvisor/kafdrop/model/BrokerVO.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package com.homeadvisor.kafdrop.model;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Date;
import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.annotation.JsonProperty;

public class BrokerVO
{
Expand All @@ -30,6 +32,8 @@ public class BrokerVO
private int jmxPort;
private int version;
private boolean controller;
private Map<String,String> listenerSecurityProtocolMap;
private List<String> endpoints;
private Date timestamp;

public int getId()
Expand Down Expand Up @@ -102,4 +106,24 @@ public void setController(boolean controller)
{
this.controller = controller;
}

public Map<String,String> getListenerSecurityProtocolMap()
{
return this.listenerSecurityProtocolMap;
}

public void setListenerSecurityProtocolMap(Map<String,String> listenerSecurityProtocolMap)
{
this.listenerSecurityProtocolMap = listenerSecurityProtocolMap;
}

public List<String> getEndpoints()
{
return this.endpoints;
}

public void setEndpoints(List<String> endpoints)
{
this.endpoints = endpoints;
}
}
126 changes: 83 additions & 43 deletions src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,81 @@

package com.homeadvisor.kafdrop.service;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.homeadvisor.kafdrop.model.*;
import com.homeadvisor.kafdrop.model.BrokerVO;
import com.homeadvisor.kafdrop.model.ClusterSummaryVO;
import com.homeadvisor.kafdrop.model.ConsumerPartitionVO;
import com.homeadvisor.kafdrop.model.ConsumerRegistrationVO;
import com.homeadvisor.kafdrop.model.ConsumerTopicVO;
import com.homeadvisor.kafdrop.model.ConsumerVO;
import com.homeadvisor.kafdrop.model.TopicPartitionStateVO;
import com.homeadvisor.kafdrop.model.TopicPartitionVO;
import com.homeadvisor.kafdrop.model.TopicRegistrationVO;
import com.homeadvisor.kafdrop.model.TopicVO;
import com.homeadvisor.kafdrop.util.BrokerChannel;
import com.homeadvisor.kafdrop.util.Version;
import kafka.api.ConsumerMetadataRequest;

import kafka.api.GroupCoordinatorRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.GroupCoordinatorResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.network.BlockingChannel;
import kafka.server.ConfigType;
import kafka.utils.ZKGroupDirs;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;

@Service
public class CuratorKafkaMonitor implements KafkaMonitor
Expand Down Expand Up @@ -103,6 +139,9 @@ public void start() throws Exception

threadPool = new ForkJoinPool(properties.getThreadPoolSize());

objectMapper.setPropertyNamingStrategy(
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);

FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(properties.getRetry().getBackoffMillis());

Expand All @@ -117,7 +156,8 @@ public void start() throws Exception

cacheInitCounter.set(4);

brokerPathCache = new PathChildrenCache(curatorFramework, ZkUtils.BrokerIdsPath(), true);
// TODO: re-factor to org.apache.kafka.clients.admin.AdminClient;
brokerPathCache = new PathChildrenCache(curatorFramework, "/brokers/ids", true);
brokerPathCache.getListenable().addListener(new BrokerListener());
brokerPathCache.getListenable().addListener((f, e) -> {
if (e.getType() == PathChildrenCacheEvent.Type.INITIALIZED)
Expand All @@ -128,7 +168,7 @@ public void start() throws Exception
});
brokerPathCache.start(StartMode.POST_INITIALIZED_EVENT);

topicConfigPathCache = new PathChildrenCache(curatorFramework, ZkUtils.TopicConfigPath(), true);
topicConfigPathCache = new PathChildrenCache(curatorFramework, "/config/topics", true);
topicConfigPathCache.getListenable().addListener((f, e) -> {
if (e.getType() == PathChildrenCacheEvent.Type.INITIALIZED)
{
Expand All @@ -138,7 +178,7 @@ public void start() throws Exception
});
topicConfigPathCache.start(StartMode.POST_INITIALIZED_EVENT);

topicTreeCache = new TreeCache(curatorFramework, ZkUtils.BrokerTopicsPath());
topicTreeCache = new TreeCache(curatorFramework, "/brokers/topics");
topicTreeCache.getListenable().addListener((client, event) -> {
if (event.getType() == TreeCacheEvent.Type.INITIALIZED)
{
Expand All @@ -148,7 +188,7 @@ public void start() throws Exception
});
topicTreeCache.start();

consumerTreeCache = new TreeCache(curatorFramework, ZkUtils.ConsumersPath());
consumerTreeCache = new TreeCache(curatorFramework, "/consumers");
consumerTreeCache.getListenable().addListener((client, event) -> {
if (event.getType() == TreeCacheEvent.Type.INITIALIZED)
{
Expand All @@ -158,7 +198,7 @@ public void start() throws Exception
});
consumerTreeCache.start();

controllerNodeCache = new NodeCache(curatorFramework, ZkUtils.ControllerPath());
controllerNodeCache = new NodeCache(curatorFramework, "/controller");
controllerNodeCache.getListenable().addListener(this::updateController);
controllerNodeCache.start(true);
updateController();
Expand Down Expand Up @@ -213,7 +253,7 @@ public void stop() throws IOException

private int brokerId(ChildData input)
{
return Integer.parseInt(StringUtils.substringAfter(input.getPath(), ZkUtils.BrokerIdsPath() + "/"));
return Integer.parseInt(StringUtils.substringAfter(input.getPath(), "/brokers/ids/"));
}

private BrokerVO addBroker(BrokerVO broker)
Expand Down Expand Up @@ -392,7 +432,7 @@ public TopicVO parseZkTopic(ChildData input)
objectMapper.reader(TopicRegistrationVO.class).readValue(input.getData());

topic.setConfig(
Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.TopicConfigPath() + "/" + topic.getName()))
Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.getEntityConfigPath(ConfigType.Topic()) + "/" + topic.getName()))
.map(this::readTopicConfig)
.orElse(Collections.emptyMap()));

Expand Down Expand Up @@ -437,7 +477,7 @@ private Map<String, TopicVO> getTopicMetadata(BlockingChannel channel, String...

channel.send(request);
final kafka.api.TopicMetadataResponse underlyingResponse =
kafka.api.TopicMetadataResponse.readFrom(channel.receive().buffer());
kafka.api.TopicMetadataResponse.readFrom(channel.receive().payload());

LOG.debug("Received topic metadata response: {}", underlyingResponse);

Expand All @@ -453,7 +493,7 @@ private TopicVO processTopicMetadata(TopicMetadata tmd)
TopicVO topic = new TopicVO(tmd.topic());

topic.setConfig(
Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.TopicConfigPath() + "/" + topic.getName()))
Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.getEntityConfigPath(ConfigType.Topic()) + "/" + topic.getName()))
.map(this::readTopicConfig)
.orElse(Collections.emptyMap()));

Expand Down Expand Up @@ -482,7 +522,7 @@ private TopicPartitionVO parsePartitionMetadata(String topic, PartitionMetadata

private List<Integer> getIsr(String topic, PartitionMetadata pmd)
{
return pmd.isr().stream().map(Broker::id).collect(Collectors.toList());
return pmd.isr().stream().map(BrokerEndPoint::id).collect(Collectors.toList());
}

private Map<String, Object> readTopicConfig(ChildData d)
Expand Down Expand Up @@ -741,7 +781,7 @@ private Map<Integer, Long> getConsumerOffsets(BlockingChannel channel,
channel.send(request.underlying());

final kafka.api.OffsetFetchResponse underlyingResponse =
kafka.api.OffsetFetchResponse.readFrom(channel.receive().buffer());
kafka.api.OffsetFetchResponse.readFrom(channel.receive().payload());

LOG.debug("Received consumer offset response: {}", underlyingResponse);

Expand All @@ -766,14 +806,14 @@ private Integer offsetManagerBroker(String groupId)

private Integer offsetManagerBroker(BlockingChannel channel, String groupId)
{
final ConsumerMetadataRequest request =
new ConsumerMetadataRequest(groupId, (short) 0, 0, clientId());
final GroupCoordinatorRequest request =
new GroupCoordinatorRequest(groupId, (short) 0, 0, clientId());

LOG.debug("Sending consumer metadata request: {}", request);

channel.send(request);
ConsumerMetadataResponse response =
ConsumerMetadataResponse.readFrom(channel.receive().buffer());
GroupCoordinatorResponse response =
GroupCoordinatorResponse.readFrom(channel.receive().payload());

LOG.debug("Received consumer metadata response: {}", response);

Expand Down Expand Up @@ -866,7 +906,7 @@ private OffsetResponse sendOffsetRequest(Integer brokerId, TopicVO topic,
{
channel.send(offsetRequest.underlying());
final kafka.api.OffsetResponse underlyingResponse =
kafka.api.OffsetResponse.readFrom(channel.receive().buffer());
kafka.api.OffsetResponse.readFrom(channel.receive().payload());

LOG.debug("Received offset response: {}", underlyingResponse);

Expand Down

0 comments on commit 99683a5

Please sign in to comment.