Skip to content

Commit

Permalink
completed feature
Browse files Browse the repository at this point in the history
  • Loading branch information
m1a2st committed Mar 2, 2025
1 parent 98bb79e commit 33bd2a5
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

/**
* Quota callback interface for brokers and controllers that enables customization of client quota computation.
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the interceptor to register metrics.
* The following tags are automatically added to all metrics registered:
* <code>config</code> set to <code>interceptor.classes</code>, and <code>class</code> set to the ConsumerInterceptor
* class name.
*/
public interface ClientQuotaCallback extends Configurable {

Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/kafka/server/ClientRequestQuotaManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import kafka.network.RequestChannel;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.QuotaViolationException;
import org.apache.kafka.common.metrics.Sensor;
Expand Down Expand Up @@ -49,8 +50,13 @@ public class ClientRequestQuotaManager extends ClientQuotaManager {
// Visible for testing
private final Sensor exemptSensor;

public ClientRequestQuotaManager(ClientQuotaManagerConfig config, Metrics metrics, Time time, String threadNamePrefix, Optional<ClientQuotaCallback> quotaCallback) {
super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix, OptionConverters.toScala(quotaCallback));
public ClientRequestQuotaManager(
ClientQuotaManagerConfig config,
Metrics metrics, Time time,
String threadNamePrefix,
Optional<Plugin<ClientQuotaCallback>> quotaCallbackPlugin
) {
super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix, OptionConverters.toScala(quotaCallbackPlugin));
this.maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds);
this.metrics = metrics;
this.exemptMetricName = metrics.metricName("exempt-request-time", QuotaType.REQUEST.toString(), "Tracking exempt-request-time utilization percentage");
Expand Down
22 changes: 14 additions & 8 deletions core/src/main/java/kafka/server/QuotaFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kafka.server;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.config.ClientQuotaManagerConfig;
Expand Down Expand Up @@ -114,20 +115,25 @@ public void shutdown() {
}

public static QuotaManagers instantiate(KafkaConfig cfg, Metrics metrics, Time time, String threadNamePrefix) {
ClientQuotaCallback clientQuotaCallback = cfg.getConfiguredInstance(
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, ClientQuotaCallback.class);

Plugin<ClientQuotaCallback> clientQuotaCallbackPlugin = createClientQuotaCallback(cfg, metrics);

return new QuotaManagers(
new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.FETCH, time, threadNamePrefix, Option.apply(clientQuotaCallback)),
new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.PRODUCE, time, threadNamePrefix, Option.apply(clientQuotaCallback)),
new ClientRequestQuotaManager(clientConfig(cfg), metrics, time, threadNamePrefix, Optional.ofNullable(clientQuotaCallback)),
new ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics, time, threadNamePrefix, Option.apply(clientQuotaCallback)),
new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.FETCH, time, threadNamePrefix, Option.apply(clientQuotaCallbackPlugin)),
new ClientQuotaManager(clientConfig(cfg), metrics, QuotaType.PRODUCE, time, threadNamePrefix, Option.apply(clientQuotaCallbackPlugin)),
new ClientRequestQuotaManager(clientConfig(cfg), metrics, time, threadNamePrefix, Optional.of(clientQuotaCallbackPlugin)),
new ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics, time, threadNamePrefix, Option.apply(clientQuotaCallbackPlugin)),
new ReplicationQuotaManager(replicationConfig(cfg), metrics, QuotaType.LEADER_REPLICATION, time),
new ReplicationQuotaManager(replicationConfig(cfg), metrics, QuotaType.FOLLOWER_REPLICATION, time),
new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, QuotaType.ALTER_LOG_DIRS_REPLICATION, time),
Optional.ofNullable(clientQuotaCallback)
Optional.ofNullable(clientQuotaCallbackPlugin.get())
);
}

private static Plugin<ClientQuotaCallback> createClientQuotaCallback(KafkaConfig cfg, Metrics metrics) {
ClientQuotaCallback clientQuotaCallback = cfg.getConfiguredInstance(
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, ClientQuotaCallback.class);
return Plugin.wrapInstance(clientQuotaCallback, metrics, QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG);
}

private static ClientQuotaManagerConfig clientConfig(KafkaConfig cfg) {
return new ClientQuotaManagerConfig(
Expand Down
21 changes: 16 additions & 5 deletions core/src/main/scala/kafka/server/ClientQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.function.Consumer
import kafka.network.RequestChannel
import kafka.server.ClientQuotaManager._
import kafka.utils.Logging
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.{Cluster, MetricName}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.Metrics
Expand Down Expand Up @@ -137,23 +138,33 @@ object ClientQuotaManager {
* @param quotaType Quota type of this quota manager
* @param time @Time object to use
* @param threadNamePrefix The thread prefix to use
* @param clientQuotaCallback An optional @ClientQuotaCallback
* @param clientQuotaCallbackPlugin An optional @ClientQuotaCallback and
* warp it in a {@link org.apache.kafka.common.internals.Plugin}
*/
class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val metrics: Metrics,
private val quotaType: QuotaType,
private val time: Time,
private val threadNamePrefix: String,
private val clientQuotaCallback: Option[ClientQuotaCallback] = None) extends Logging {
private val clientQuotaCallbackPlugin: Option[Plugin[ClientQuotaCallback]] = None) extends Logging {

private val lock = new ReentrantReadWriteLock()
private val sensorAccessor = new SensorAccess(lock, metrics)
private val quotaCallback = clientQuotaCallback.getOrElse(new DefaultQuotaCallback)
private val quotaCallback = clientQuotaCallbackPlugin match {
case Some(plugin) => if (plugin.get() == null)
new DefaultQuotaCallback
else
plugin.get()
case None => new DefaultQuotaCallback
}
private val clientQuotaType = QuotaType.toClientQuotaType(quotaType)

@volatile
private var quotaTypesEnabled = clientQuotaCallback match {
case Some(_) => QuotaTypes.CustomQuotas
private var quotaTypesEnabled = clientQuotaCallbackPlugin match {
case Some(plugin) => if (plugin.get() == null)
QuotaTypes.NoQuotas
else
QuotaTypes.CustomQuotas
case None => QuotaTypes.NoQuotas
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka.server
import kafka.network.RequestChannel
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.QuotaViolationException
import org.apache.kafka.common.metrics.Sensor
Expand Down Expand Up @@ -165,7 +166,7 @@ class ControllerMutationQuotaManager(private val config: ClientQuotaManagerConfi
private val metrics: Metrics,
private val time: Time,
private val threadNamePrefix: String,
private val quotaCallback: Option[ClientQuotaCallback])
private val quotaCallback: Option[Plugin[ClientQuotaCallback]])
extends ClientQuotaManager(config, metrics, QuotaType.CONTROLLER_MUTATION, time, threadNamePrefix, quotaCallback) {

override protected def clientQuotaMetricName(quotaMetricTags: Map[String, String]): MetricName = {
Expand Down
85 changes: 66 additions & 19 deletions core/src/test/java/kafka/test/api/CustomQuotaCallbackTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,48 @@
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Monitorable;
import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.test.junit.ClusterTestExtensions;
import org.apache.kafka.server.config.QuotaConfig;
import org.apache.kafka.server.quota.ClientQuotaCallback;
import org.apache.kafka.server.quota.ClientQuotaEntity;
import org.apache.kafka.server.quota.ClientQuotaType;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@ClusterTestDefaults(controllers = 3,
types = {Type.KRAFT},
serverProperties = {
@ClusterConfigProperty(id = 3000, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"),
@ClusterConfigProperty(id = 3001, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"),
@ClusterConfigProperty(id = 3002, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"),
}
)
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

@ExtendWith(ClusterTestExtensions.class)
public class CustomQuotaCallbackTest {

private final ClusterInstance cluster;

public CustomQuotaCallbackTest(ClusterInstance clusterInstance) {
this.cluster = clusterInstance;
}

@ClusterTest
public void testCustomQuotaCallbackWithControllerServer() throws InterruptedException {

@ClusterTest(
controllers = 3,
types = {Type.KRAFT},
serverProperties = {
@ClusterConfigProperty(id = 3000, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"),
@ClusterConfigProperty(id = 3001, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"),
@ClusterConfigProperty(id = 3002, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "kafka.test.api.CustomQuotaCallbackTest$CustomQuotaCallback"),
}
)
public void testCustomQuotaCallbackWithControllerServer(ClusterInstance cluster) throws InterruptedException {

try (Admin admin = cluster.admin(Map.of())) {
admin.createTopics(List.of(new NewTopic("topic", 1, (short) 1)));
Expand All @@ -80,7 +83,29 @@ public void testCustomQuotaCallbackWithControllerServer() throws InterruptedExce
}
}


@Test
public void testCreateMonitorCustomQuotaCallback() {
Metrics metrics = new Metrics();
assertEquals(1, metrics.metrics().size());
Plugin<ClientQuotaCallback> clientQuotaCallbackPlugin = Plugin.wrapInstance(
new MonitorCustomQuotaCallback(),
metrics,
QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG
);
MonitorCustomQuotaCallback clientQuotaCallback = (MonitorCustomQuotaCallback) clientQuotaCallbackPlugin.get();
assertEquals(MonitorCustomQuotaCallback.class, clientQuotaCallback.getClass());
MetricName metricName = null;
for (MetricName name : metrics.metrics().keySet()) {
if (name.name().equals(clientQuotaCallback.metricName.name())) {
metricName = name;
}
}
assertNotNull(metricName);
assertEquals(QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, metricName.tags().get("config"));
assertEquals(MonitorCustomQuotaCallback.class.getSimpleName(), metricName.tags().get("class"));
assertEquals(0, metrics.metric(metricName).metricValue());
}

public static class CustomQuotaCallback implements ClientQuotaCallback {

public static final Map<String, AtomicInteger> COUNTERS = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -128,4 +153,26 @@ public void configure(Map<String, ?> configs) {
}

}

public static class MonitorCustomQuotaCallback extends CustomQuotaCallback implements Monitorable {

public final AtomicInteger counter = new AtomicInteger();
public static MetricName metricName = null;

@Override
public void withPluginMetrics(PluginMetrics metrics) {
metricName = metrics.metricName(
"client quota callback count",
"Number of times client quota callback is triggered",
Map.of()
);
metrics.addMetric(metricName, (Gauge<Integer>) (config, now) -> counter.get());
}

@Override
public boolean updateClusterMetadata(Cluster cluster) {
counter.incrementAndGet();
return true;
}
}
}

0 comments on commit 33bd2a5

Please sign in to comment.