diff --git a/core/src/main/java/kafka/autobalancer/AutoBalancerManager.java b/core/src/main/java/kafka/autobalancer/AutoBalancerManager.java index 237663adb3..884c40372b 100644 --- a/core/src/main/java/kafka/autobalancer/AutoBalancerManager.java +++ b/core/src/main/java/kafka/autobalancer/AutoBalancerManager.java @@ -63,16 +63,16 @@ protected void init() { RecordClusterModel clusterModel = new RecordClusterModel(new LogContext(String.format("[ClusterModel id=%d] ", quorumController.nodeId()))); this.loadRetriever = new LoadRetriever(config, quorumController, clusterModel, new LogContext(String.format("[LoadRetriever id=%d] ", quorumController.nodeId()))); - this.actionExecutorService = new ControllerActionExecutorService(config, quorumController, + this.actionExecutorService = new ControllerActionExecutorService(quorumController, new LogContext(String.format("[ExecutionManager id=%d] ", quorumController.nodeId()))); this.actionExecutorService.start(); this.anomalyDetector = new AnomalyDetectorBuilder() .logContext(new LogContext(String.format("[AnomalyDetector id=%d] ", quorumController.nodeId()))) - .maxActionsNumPerExecution(config.getInt(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS)) .detectIntervalMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS)) .maxTolerateMetricsDelayMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ACCEPTED_METRICS_DELAY_MS)) - .coolDownIntervalPerActionMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS)) + .executionConcurrency(config.getInt(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY)) + .executionIntervalMs(config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS)) .clusterModel(clusterModel) .executor(this.actionExecutorService) .addGoals(config.getConfiguredInstances(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS, Goal.class)) @@ -125,7 +125,6 @@ public void validateReconfiguration(Map configs) throws ConfigExcepti ConfigUtils.getBoolean(objectConfigs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ENABLE); } this.anomalyDetector.validateReconfiguration(objectConfigs); - this.actionExecutorService.validateReconfiguration(objectConfigs); } catch (ConfigException e) { throw e; } catch (Exception e) { @@ -149,7 +148,6 @@ public void reconfigure(Map configs) { } } this.anomalyDetector.reconfigure(objectConfigs); - this.actionExecutorService.reconfigure(objectConfigs); } @Override diff --git a/core/src/main/java/kafka/autobalancer/config/AutoBalancerControllerConfig.java b/core/src/main/java/kafka/autobalancer/config/AutoBalancerControllerConfig.java index afb7a466b8..dd4353feac 100644 --- a/core/src/main/java/kafka/autobalancer/config/AutoBalancerControllerConfig.java +++ b/core/src/main/java/kafka/autobalancer/config/AutoBalancerControllerConfig.java @@ -41,8 +41,8 @@ public class AutoBalancerControllerConfig extends AbstractConfig { public static final String AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION = PREFIX + "network.in.distribution.detect.avg.deviation"; public static final String AUTO_BALANCER_CONTROLLER_NETWORK_OUT_USAGE_DISTRIBUTION_DETECT_THRESHOLD = PREFIX + "network.out.usage.distribution.detect.threshold"; public static final String AUTO_BALANCER_CONTROLLER_NETWORK_OUT_DISTRIBUTION_DETECT_AVG_DEVIATION = PREFIX + "network.out.distribution.detect.avg.deviation"; + public static final String AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY = PREFIX + "execution.concurrency"; public static final String AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS = PREFIX + "execution.interval.ms"; - public static final String AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS = PREFIX + "execution.steps"; public static final String AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS = PREFIX + "exclude.broker.ids"; public static final String AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS = PREFIX + "exclude.topics"; /* Default values */ @@ -61,8 +61,8 @@ public class AutoBalancerControllerConfig extends AbstractConfig { public static final double DEFAULT_AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION = 0.2; public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_NETWORK_OUT_USAGE_DISTRIBUTION_DETECT_THRESHOLD = 1024 * 1024; public static final double DEFAULT_AUTO_BALANCER_CONTROLLER_NETWORK_OUT_DISTRIBUTION_DETECT_AVG_DEVIATION = 0.2; - public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS = 1000; - public static final int DEFAULT_AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS = 60; + public static final int DEFAULT_AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY = 50; + public static final long DEFAULT_AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS = 5000; public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS = ""; public static final String DEFAULT_AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS = ""; /* Documents */ @@ -82,8 +82,8 @@ public class AutoBalancerControllerConfig extends AbstractConfig { public static final String AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION_DOC = "The acceptable range of deviation for average network input bandwidth usage. Default is 0.2, which means the expected network traffic range will be [0.8 * loadAvg, 1.2 * loadAvg]"; public static final String AUTO_BALANCER_CONTROLLER_NETWORK_OUT_USAGE_DISTRIBUTION_DETECT_THRESHOLD_DOC = "The detection threshold of NetworkOutUsageDistributionGoal. If a broker's outbound network bandwidth usage is below this configured value, the broker will not be proactively scheduled for rebalancing."; public static final String AUTO_BALANCER_CONTROLLER_NETWORK_OUT_DISTRIBUTION_DETECT_AVG_DEVIATION_DOC = "The acceptable range of deviation for average network out bandwidth usage. Default is 0.2, which means the expected network traffic range will be [0.8 * loadAvg, 1.2 * loadAvg]"; - public static final String AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS_DOC = "Time interval between reassignments per broker in milliseconds"; - public static final String AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS_DOC = "The max number of reassignments per broker in one execution"; + public static final String AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY_DOC = "The maximum number of actions that can be executed in a single batch per broker, including partition closing and openning"; + public static final String AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS_DOC = "Time interval between each action batch in milliseconds"; public static final String AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS_DOC = "Broker ids that auto balancer will ignore during balancing"; public static final String AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS_DOC = "Topics that auto balancer will ignore during balancing"; @@ -96,8 +96,8 @@ public class AutoBalancerControllerConfig extends AbstractConfig { AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_IN_DISTRIBUTION_DETECT_AVG_DEVIATION, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_USAGE_DISTRIBUTION_DETECT_THRESHOLD, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_NETWORK_OUT_DISTRIBUTION_DETECT_AVG_DEVIATION, + AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS, - AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS ); @@ -149,9 +149,9 @@ public class AutoBalancerControllerConfig extends AbstractConfig { .define(AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS, ConfigDef.Type.LONG, DEFAULT_AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS, ConfigDef.Importance.HIGH, AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS_DOC) - .define(AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS, ConfigDef.Type.INT, - DEFAULT_AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS, ConfigDef.Importance.HIGH, - AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS_DOC) + .define(AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY, ConfigDef.Type.INT, + DEFAULT_AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY, ConfigDef.Importance.HIGH, + AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY_DOC) .define(AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS, ConfigDef.Type.LIST, DEFAULT_AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS, ConfigDef.Importance.HIGH, AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS_DOC) diff --git a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java index 2bb40a25be..8995964752 100644 --- a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java +++ b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetector.java @@ -43,6 +43,7 @@ import java.util.stream.Collectors; public class AnomalyDetector extends AbstractResumableService { + private static final double MAX_PARTITION_REASSIGNMENT_RATIO = 0.5; private final ClusterModel clusterModel; private final ScheduledExecutorService executorService; private final ActionExecutorService actionExecutor; @@ -50,22 +51,22 @@ public class AnomalyDetector extends AbstractResumableService { private volatile List goalsByPriority; private volatile Set excludedBrokers; private volatile Set excludedTopics; - private volatile int maxActionsNumPerExecution; private volatile long detectInterval; private volatile long maxTolerateMetricsDelayMs; - private volatile long coolDownIntervalPerActionMs; + private volatile int executionConcurrency; + private volatile long executionIntervalMs; private volatile boolean isLeader = false; private volatile Map slowBrokers = new HashMap<>(); - AnomalyDetector(LogContext logContext, int maxActionsNumPerDetect, long detectIntervalMs, long maxTolerateMetricsDelayMs, - long coolDownIntervalPerActionMs, ClusterModel clusterModel, ActionExecutorService actionExecutor, + AnomalyDetector(LogContext logContext, long detectIntervalMs, long maxTolerateMetricsDelayMs, int executionConcurrency, + long executionIntervalMs, ClusterModel clusterModel, ActionExecutorService actionExecutor, List goals, Set excludedBrokers, Set excludedTopics) { super(logContext); this.configChangeLock = new ReentrantLock(); - this.maxActionsNumPerExecution = maxActionsNumPerDetect; this.detectInterval = detectIntervalMs; this.maxTolerateMetricsDelayMs = maxTolerateMetricsDelayMs; - this.coolDownIntervalPerActionMs = coolDownIntervalPerActionMs; + this.executionConcurrency = executionConcurrency; + this.executionIntervalMs = executionIntervalMs; this.clusterModel = clusterModel; this.actionExecutor = actionExecutor; this.executorService = Executors.newSingleThreadScheduledExecutor(new AutoBalancerThreadFactory("anomaly-detector")); @@ -75,8 +76,8 @@ public class AnomalyDetector extends AbstractResumableService { this.excludedTopics = excludedTopics; this.executorService.schedule(this::detect, detectInterval, TimeUnit.MILLISECONDS); S3StreamKafkaMetricsManager.setSlowBrokerSupplier(() -> this.slowBrokers); - logger.info("maxActionsNumPerDetect: {}, detectInterval: {}ms, coolDownIntervalPerAction: {}ms, goals: {}, excluded brokers: {}, excluded topics: {}", - this.maxActionsNumPerExecution, this.detectInterval, this.coolDownIntervalPerActionMs, this.goalsByPriority, this.excludedBrokers, this.excludedTopics); + logger.info("detectInterval: {}ms, executionConcurrency: {}, executionIntervalMs: {}ms, goals: {}, excluded brokers: {}, excluded topics: {}", + this.detectInterval, this.executionConcurrency, this.executionIntervalMs, this.goalsByPriority, this.excludedBrokers, this.excludedTopics); } @Override @@ -127,10 +128,10 @@ public void validateReconfiguration(Map configs) throws ConfigEx throw new ConfigException(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS, detectInterval); } } - if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS)) { - long steps = ConfigUtils.getLong(configs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS); - if (steps < 0) { - throw new ConfigException(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_CONSUMER_POLL_TIMEOUT, steps); + if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY)) { + long concurrency = ConfigUtils.getInteger(configs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY); + if (concurrency < 0) { + throw new ConfigException(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY, concurrency); } } if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS)) { @@ -145,9 +146,9 @@ public void validateReconfiguration(Map configs) throws ConfigEx tmp.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS); } if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS)) { - long coolDownInterval = ConfigUtils.getLong(configs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS); - if (coolDownInterval < 0) { - throw new ConfigException(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS, coolDownInterval); + long interval = ConfigUtils.getLong(configs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS); + if (interval < 0) { + throw new ConfigException(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS, interval); } } validateGoalsReconfiguration(configs); @@ -173,8 +174,8 @@ public void reconfigure(Map configs) { if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS)) { this.detectInterval = ConfigUtils.getLong(configs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_ANOMALY_DETECT_INTERVAL_MS); } - if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS)) { - this.maxActionsNumPerExecution = ConfigUtils.getInteger(configs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_STEPS); + if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY)) { + this.executionConcurrency = ConfigUtils.getInteger(configs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_CONCURRENCY); } if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_BROKER_IDS)) { AutoBalancerControllerConfig tmp = new AutoBalancerControllerConfig(configs, false); @@ -186,7 +187,7 @@ public void reconfigure(Map configs) { this.excludedTopics = new HashSet<>(tmp.getList(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXCLUDE_TOPICS)); } if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS)) { - this.coolDownIntervalPerActionMs = ConfigUtils.getLong(configs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS); + this.executionIntervalMs = ConfigUtils.getLong(configs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS); } if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_GOALS)) { reconfigureGoals(configs); @@ -221,12 +222,13 @@ private boolean isRunnable() { return this.running.get() && this.isLeader; } - long detect0() { + long detect0() throws Exception { long detectInterval; Set excludedBrokers; Set excludedTopics; long maxTolerateMetricsDelayMs; - long coolDownIntervalPerActionMs; + int maxExecutionConcurrency; + long executionIntervalMs; List goals; configChangeLock.lock(); try { @@ -234,7 +236,8 @@ long detect0() { excludedBrokers = new HashSet<>(this.excludedBrokers); excludedTopics = new HashSet<>(this.excludedTopics); maxTolerateMetricsDelayMs = this.maxTolerateMetricsDelayMs; - coolDownIntervalPerActionMs = this.coolDownIntervalPerActionMs; + maxExecutionConcurrency = this.executionConcurrency; + executionIntervalMs = this.executionIntervalMs; goals = new ArrayList<>(this.goalsByPriority); } finally { configChangeLock.unlock(); @@ -272,15 +275,64 @@ long detect0() { return detectInterval; } int totalActionSize = totalActions.size(); - List actionsToExecute = checkAndMergeActions(totalActions); - logger.info("Total actions num: {}, executable num: {}", totalActionSize, actionsToExecute.size()); - this.actionExecutor.execute(actionsToExecute); + List> actionsToExecute = checkAndGroupActions(totalActions, maxExecutionConcurrency, getTopicPartitionCount(snapshot)); + logger.info("Total actions num: {}, split to {} batches", totalActionSize, actionsToExecute.size()); - return actionsToExecute.size() * coolDownIntervalPerActionMs + detectInterval; + for (List batch : actionsToExecute) { + this.actionExecutor.execute(batch).get(); + Thread.sleep(executionIntervalMs); + } + + return detectInterval; + } + + Map getTopicPartitionCount(ClusterModelSnapshot snapshot) { + Map topicPartitionNumMap = new HashMap<>(); + for (BrokerUpdater.Broker broker : snapshot.brokers()) { + for (TopicPartitionReplicaUpdater.TopicPartitionReplica replica : snapshot.replicasFor(broker.getBrokerId())) { + topicPartitionNumMap.put(replica.getTopicPartition().topic(), topicPartitionNumMap + .getOrDefault(replica.getTopicPartition().topic(), 0) + 1); + } + } + return topicPartitionNumMap; + } + + List> checkAndGroupActions(List actions, int maxExecutionConcurrency, Map topicPartitionNumMap) { + List mergedActions = checkAndMergeActions(actions); + List> groupedActions = new ArrayList<>(); + List batch = new ArrayList<>(); + + Map topicPartitionCountMap = new HashMap<>(); + Map brokerActionConcurrencyMap = new HashMap<>(); + for (Action action : mergedActions) { + int expectedPartitionCount = topicPartitionCountMap.getOrDefault(action.getSrcTopicPartition().topic(), 0) + 1; + int expectedSrcBrokerConcurrency = brokerActionConcurrencyMap.getOrDefault(action.getSrcBrokerId(), 0) + 1; + int expectedDestBrokerConcurrency = brokerActionConcurrencyMap.getOrDefault(action.getDestBrokerId(), 0) + 1; + int partitionLimit = (int) Math.ceil(topicPartitionNumMap + .getOrDefault(action.getSrcTopicPartition().topic(), 0) * MAX_PARTITION_REASSIGNMENT_RATIO); + if (expectedPartitionCount > partitionLimit || expectedSrcBrokerConcurrency > maxExecutionConcurrency + || expectedDestBrokerConcurrency > maxExecutionConcurrency) { + groupedActions.add(batch); + batch = new ArrayList<>(); + topicPartitionCountMap.clear(); + brokerActionConcurrencyMap.clear(); + expectedPartitionCount = 1; + expectedSrcBrokerConcurrency = 1; + expectedDestBrokerConcurrency = 1; + } + batch.add(action); + topicPartitionCountMap.put(action.getSrcTopicPartition().topic(), expectedPartitionCount); + brokerActionConcurrencyMap.put(action.getSrcBrokerId(), expectedSrcBrokerConcurrency); + brokerActionConcurrencyMap.put(action.getDestBrokerId(), expectedDestBrokerConcurrency); + } + if (!batch.isEmpty()) { + groupedActions.add(batch); + } + + return groupedActions; } List checkAndMergeActions(List actions) throws IllegalStateException { - actions = actions.subList(0, Math.min(actions.size(), maxActionsNumPerExecution)); List splitActions = new ArrayList<>(); List mergedActions = new ArrayList<>(); Map actionMergeMap = new HashMap<>(); diff --git a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorBuilder.java b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorBuilder.java index 152472e9fb..533446a3b1 100644 --- a/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorBuilder.java +++ b/core/src/main/java/kafka/autobalancer/detector/AnomalyDetectorBuilder.java @@ -29,10 +29,10 @@ public class AnomalyDetectorBuilder { private LogContext logContext = null; private ClusterModel clusterModel = null; private ActionExecutorService executor = null; - private int maxActionsNumPerDetect = Integer.MAX_VALUE; private long detectIntervalMs = 60000; private long maxTolerateMetricsDelayMs = 30000; - private long coolDownIntervalPerActionMs = 100; + private int executionConcurrency = 50; + private long executionIntervalMs = 5000; public AnomalyDetectorBuilder() { @@ -83,11 +83,6 @@ public AnomalyDetectorBuilder executor(ActionExecutorService executor) { return this; } - public AnomalyDetectorBuilder maxActionsNumPerExecution(int maxActionsNumPerExecution) { - this.maxActionsNumPerDetect = maxActionsNumPerExecution; - return this; - } - public AnomalyDetectorBuilder detectIntervalMs(long detectIntervalMs) { this.detectIntervalMs = detectIntervalMs; return this; @@ -98,8 +93,13 @@ public AnomalyDetectorBuilder maxTolerateMetricsDelayMs(long maxTolerateMetricsD return this; } - public AnomalyDetectorBuilder coolDownIntervalPerActionMs(long coolDownIntervalPerActionMs) { - this.coolDownIntervalPerActionMs = coolDownIntervalPerActionMs; + public AnomalyDetectorBuilder executionConcurrency(int executionConcurrency) { + this.executionConcurrency = executionConcurrency; + return this; + } + + public AnomalyDetectorBuilder executionIntervalMs(long executionIntervalMs) { + this.executionIntervalMs = executionIntervalMs; return this; } @@ -116,7 +116,7 @@ public AnomalyDetector build() { if (goalsByPriority.isEmpty()) { throw new IllegalArgumentException("At least one goal must be set"); } - return new AnomalyDetector(logContext, maxActionsNumPerDetect, detectIntervalMs, maxTolerateMetricsDelayMs, - coolDownIntervalPerActionMs, clusterModel, executor, goalsByPriority, excludedBrokers, excludedTopics); + return new AnomalyDetector(logContext, detectIntervalMs, maxTolerateMetricsDelayMs, executionConcurrency, + executionIntervalMs, clusterModel, executor, goalsByPriority, excludedBrokers, excludedTopics); } } diff --git a/core/src/main/java/kafka/autobalancer/executor/ActionExecutorService.java b/core/src/main/java/kafka/autobalancer/executor/ActionExecutorService.java index 50621644c9..4d74d1d549 100644 --- a/core/src/main/java/kafka/autobalancer/executor/ActionExecutorService.java +++ b/core/src/main/java/kafka/autobalancer/executor/ActionExecutorService.java @@ -12,10 +12,9 @@ package kafka.autobalancer.executor; import kafka.autobalancer.common.Action; -import org.apache.kafka.common.config.ConfigException; import java.util.List; -import java.util.Map; +import java.util.concurrent.CompletableFuture; public interface ActionExecutorService { @@ -23,11 +22,5 @@ public interface ActionExecutorService { void shutdown(); - void execute(Action action); - - void execute(List actions); - - void validateReconfiguration(Map configs) throws ConfigException; - - void reconfigure(Map configs); + CompletableFuture execute(List actions); } diff --git a/core/src/main/java/kafka/autobalancer/executor/ControllerActionExecutorService.java b/core/src/main/java/kafka/autobalancer/executor/ControllerActionExecutorService.java index ad3fbc2574..7e3330cc09 100644 --- a/core/src/main/java/kafka/autobalancer/executor/ControllerActionExecutorService.java +++ b/core/src/main/java/kafka/autobalancer/executor/ControllerActionExecutorService.java @@ -15,17 +15,16 @@ import kafka.autobalancer.common.Action; import kafka.autobalancer.common.ActionType; import kafka.autobalancer.common.AutoBalancerConstants; -import kafka.autobalancer.config.AutoBalancerControllerConfig; import kafka.autobalancer.listeners.BrokerStatusListener; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; +import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.UnregisterBrokerRecord; -import org.apache.kafka.common.utils.ConfigUtils; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.KafkaThread; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.controller.Controller; import org.apache.kafka.controller.ControllerRequestContext; import org.apache.kafka.metadata.BrokerRegistrationFencingChange; @@ -33,36 +32,34 @@ import org.slf4j.Logger; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; public class ControllerActionExecutorService implements ActionExecutorService, Runnable, BrokerStatusListener { - private final BlockingQueue actionQueue = new ArrayBlockingQueue<>(1000); + private final BlockingQueue actionQueue = new ArrayBlockingQueue<>(1000); private final Set fencedBrokers = ConcurrentHashMap.newKeySet(); - private Logger logger; - private Controller controller; - private volatile long executionInterval; - private KafkaThread dispatchThread; - // TODO: optimize to per-broker concurrency control - private long lastExecutionTime = 0L; + private final Logger logger; + private final Controller controller; + private final KafkaThread dispatchThread; private volatile boolean shutdown; - public ControllerActionExecutorService(AutoBalancerControllerConfig config, Controller controller) { - this(config, controller, null); + public ControllerActionExecutorService(Controller controller) { + this(controller, null); } - public ControllerActionExecutorService(AutoBalancerControllerConfig config, Controller controller, LogContext logContext) { + public ControllerActionExecutorService(Controller controller, LogContext logContext) { if (logContext == null) { logContext = new LogContext("[ExecutionManager] "); } this.logger = logContext.logger(AutoBalancerConstants.AUTO_BALANCER_LOGGER_CLAZZ); this.controller = controller; - this.executionInterval = config.getLong(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS); this.dispatchThread = KafkaThread.daemon("executor-dispatcher", this); } @@ -81,97 +78,93 @@ public void shutdown() { } @Override - public void execute(Action action) { + public CompletableFuture execute(List actions) { + CompletableFuture cf = new CompletableFuture<>(); try { - this.actionQueue.put(action); - } catch (InterruptedException ignored) { - + actionQueue.put(new Task(actions, cf)); + } catch (InterruptedException e) { + logger.error("Failed to put actions into queue", e); + cf.completeExceptionally(e); } + return cf; } @Override - public void execute(List actions) { - for (Action action : actions) { - execute(action); - } - } + public void run() { + while (!shutdown) { + try { + doReassign(actionQueue.take()); + } catch (InterruptedException ignored) { - @Override - public void validateReconfiguration(Map configs) throws ConfigException { - try { - if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS)) { - long interval = ConfigUtils.getLong(configs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS); - if (interval < 0) { - throw new ConfigException(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS, interval); - } } - } catch (ConfigException e) { - throw e; - } catch (Exception e) { - throw new ConfigException("Reconfiguration validation error " + e.getMessage()); } } - @Override - public void reconfigure(Map configs) { - if (configs.containsKey(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS)) { - this.executionInterval = ConfigUtils.getLong(configs, AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS); + private void doReassign(Task task) { + ControllerRequestContext context = new ControllerRequestContext(null, null, OptionalLong.empty()); + AlterPartitionReassignmentsRequestData request = new AlterPartitionReassignmentsRequestData(); + List topicList = new ArrayList<>(); + + Map> topicPartitionMap = new HashMap<>(); + + for (Action action : task.actions) { + if (fencedBrokers.contains(action.getDestBrokerId())) { + logger.info("Broker {} is fenced, skip action {}", action.getDestBrokerId(), action); + continue; + } + addTopicPartition(topicPartitionMap, action.getSrcTopicPartition(), action.getDestBrokerId()); + if (action.getType() == ActionType.SWAP) { + addTopicPartition(topicPartitionMap, action.getDestTopicPartition(), action.getSrcBrokerId()); + } + } + for (Map.Entry> entry : topicPartitionMap.entrySet()) { + AlterPartitionReassignmentsRequestData.ReassignableTopic topic = new AlterPartitionReassignmentsRequestData.ReassignableTopic() + .setName(entry.getKey()); + topic.setPartitions(entry.getValue()); + topicList.add(topic); } + request.setTopics(topicList); + this.controller.alterPartitionReassignments(context, request).whenComplete((response, exception) -> { + if (exception != null) { + logger.error("Failed to alter partition reassignments", exception); + task.getFuture().completeExceptionally(exception); + } else { + handleResponse(response, task.getFuture()); + } + }); } - @Override - public void run() { - while (!shutdown) { - try { - Action action = actionQueue.take(); - if (fencedBrokers.contains(action.getDestBrokerId())) { - logger.info("Broker {} is fenced, skip action {}", action.getDestBrokerId(), action); - continue; - } - long now = System.currentTimeMillis(); - long nextExecutionTime = lastExecutionTime + executionInterval; - while (!shutdown && lastExecutionTime != 0 && now < nextExecutionTime) { - try { - Thread.sleep(nextExecutionTime - now); - } catch (InterruptedException ignored) { - break; + private void handleResponse(AlterPartitionReassignmentsResponseData response, CompletableFuture future) { + Errors topLevelError = Errors.forCode(response.errorCode()); + if (topLevelError != Errors.NONE) { + future.completeExceptionally(new ApiException("Failed to alter partition reassignments", topLevelError.exception())); + } else { + for (AlterPartitionReassignmentsResponseData.ReassignableTopicResponse topicResponse : response.responses()) { + for (AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse partitionResponse : topicResponse.partitions()) { + Errors partitionError = Errors.forCode(partitionResponse.errorCode()); + if (partitionError != Errors.NONE) { + future.completeExceptionally(new ApiException(String.format("Failed to alter partition %s-%d reassignments", + topicResponse.name(), partitionResponse.partitionIndex()), partitionError.exception())); } - now = System.currentTimeMillis(); } - if (shutdown) { - break; - } - doReassign(action); - lastExecutionTime = Time.SYSTEM.milliseconds(); - logger.info("Executing {}", action.prettyString()); - } catch (InterruptedException ignored) { } + future.complete(null); } } - private void doReassign(Action action) { - ControllerRequestContext context = new ControllerRequestContext(null, null, OptionalLong.empty()); - AlterPartitionReassignmentsRequestData request = new AlterPartitionReassignmentsRequestData(); - List topicList = new ArrayList<>(); - topicList.add(buildTopic(action.getSrcTopicPartition(), action.getDestBrokerId())); - if (action.getType() == ActionType.SWAP) { - topicList.add(buildTopic(action.getDestTopicPartition(), action.getSrcBrokerId())); - } - request.setTopics(topicList); - this.controller.alterPartitionReassignments(context, request); + private void addTopicPartition(Map> topicPartitionMap, + TopicPartition tp, int brokerId) { + List partitions = topicPartitionMap + .computeIfAbsent(tp.topic(), k -> new ArrayList<>()); + partitions.add(buildPartition(tp.partition(), brokerId)); } - private AlterPartitionReassignmentsRequestData.ReassignableTopic buildTopic(TopicPartition tp, int brokerId) { - String topicName = tp.topic(); - AlterPartitionReassignmentsRequestData.ReassignableTopic topic = new AlterPartitionReassignmentsRequestData.ReassignableTopic() - .setName(topicName) - .setPartitions(new ArrayList<>()); + private AlterPartitionReassignmentsRequestData.ReassignablePartition buildPartition(int partitionIndex, int brokerId) { AlterPartitionReassignmentsRequestData.ReassignablePartition partition = new AlterPartitionReassignmentsRequestData.ReassignablePartition(); - partition.setPartitionIndex(tp.partition()); + partition.setPartitionIndex(partitionIndex); partition.setReplicas(List.of(brokerId)); - topic.setPartitions(List.of(partition)); - return topic; + return partition; } @Override @@ -194,4 +187,22 @@ public void onBrokerRegistrationChanged(BrokerRegistrationChangeRecord record) { fencedBrokers.remove(record.brokerId()); } } + + private static class Task { + private final List actions; + private final CompletableFuture future; + + public Task(List actions, CompletableFuture future) { + this.actions = actions; + this.future = future; + } + + public List getActions() { + return actions; + } + + public CompletableFuture getFuture() { + return future; + } + } } diff --git a/core/src/test/java/kafka/autobalancer/ControllerActionExecutorServiceTest.java b/core/src/test/java/kafka/autobalancer/ControllerActionExecutorServiceTest.java index 4842382e31..e7de1f1c42 100644 --- a/core/src/test/java/kafka/autobalancer/ControllerActionExecutorServiceTest.java +++ b/core/src/test/java/kafka/autobalancer/ControllerActionExecutorServiceTest.java @@ -19,7 +19,6 @@ import kafka.autobalancer.common.Action; import kafka.autobalancer.common.ActionType; -import kafka.autobalancer.config.AutoBalancerControllerConfig; import kafka.autobalancer.executor.ControllerActionExecutorService; import kafka.test.MockController; import org.apache.kafka.common.TopicPartition; @@ -33,23 +32,21 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; @Tag("S3Unit") public class ControllerActionExecutorServiceTest { private boolean checkTopicPartition(AlterPartitionReassignmentsRequestData.ReassignableTopic topic, - String name, int partitionId, int nodeId) { + String name, int partitionIndex, int partitionId, int nodeId) { if (!topic.name().equals(name)) { return false; } - if (topic.partitions().size() != 1) { + if (topic.partitions().size() <= partitionIndex) { return false; } - AlterPartitionReassignmentsRequestData.ReassignablePartition partition = topic.partitions().get(0); + AlterPartitionReassignmentsRequestData.ReassignablePartition partition = topic.partitions().get(partitionIndex); if (partition.partitionIndex() != partitionId) { return false; } @@ -58,10 +55,6 @@ private boolean checkTopicPartition(AlterPartitionReassignmentsRequestData.Reass @Test public void testExecuteActions() throws Exception { - Map props = new HashMap<>(); - props.put(AutoBalancerControllerConfig.AUTO_BALANCER_CONTROLLER_EXECUTION_INTERVAL_MS, 100L); - AutoBalancerControllerConfig config = new AutoBalancerControllerConfig(props, false); - Controller controller = Mockito.mock(MockController.class); final ArgumentCaptor ctxCaptor = ArgumentCaptor.forClass(ControllerRequestContext.class); @@ -70,7 +63,7 @@ public void testExecuteActions() throws Exception { Mockito.doAnswer(answer -> CompletableFuture.completedFuture(new AlterPartitionReassignmentsResponseData())) .when(controller).alterPartitionReassignments(ctxCaptor.capture(), reqCaptor.capture()); - ControllerActionExecutorService controllerActionExecutorService = new ControllerActionExecutorService(config, controller); + ControllerActionExecutorService controllerActionExecutorService = new ControllerActionExecutorService(controller); controllerActionExecutorService.start(); List actionList = List.of( @@ -80,22 +73,16 @@ public void testExecuteActions() throws Exception { TestUtils.waitForCondition(() -> { List reqs = reqCaptor.getAllValues(); - if (reqs.size() != 2) { + if (reqs.size() != 1) { return false; } AlterPartitionReassignmentsRequestData reqMove = reqs.get(0); - if (reqMove.topics().size() != 1) { - return false; - } - if (!checkTopicPartition(reqMove.topics().get(0), "topic1", 0, 1)) { - return false; - } - AlterPartitionReassignmentsRequestData reqSwap = reqs.get(1); - if (reqSwap.topics().size() != 2) { + if (reqMove.topics().size() != 2) { return false; } - return checkTopicPartition(reqSwap.topics().get(0), "topic2", 0, 1) - && checkTopicPartition(reqSwap.topics().get(1), "topic1", 1, 0); + return checkTopicPartition(reqMove.topics().get(0), "topic1", 0, 0, 1) && + checkTopicPartition(reqMove.topics().get(0), "topic1", 1, 1, 0) && + checkTopicPartition(reqMove.topics().get(1), "topic2", 0, 0, 1); }, 5000L, 1000L, () -> "failed to meet reassign"); controllerActionExecutorService.shutdown(); diff --git a/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java b/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java index 7f756cd99c..b178a61e51 100644 --- a/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java +++ b/core/src/test/java/kafka/autobalancer/detector/AnomalyDetectorTest.java @@ -33,9 +33,9 @@ import org.apache.commons.math3.distribution.PoissonDistribution; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.config.ConfigException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.mockito.Mockito; import java.util.ArrayList; @@ -46,6 +46,7 @@ import java.util.Map; import java.util.Random; import java.util.StringJoiner; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class AnomalyDetectorTest { @@ -67,23 +68,8 @@ public void shutdown() { } @Override - public void execute(Action action) { - - } - - @Override - public void execute(List actions) { - - } - - @Override - public void validateReconfiguration(Map configs) throws ConfigException { - - } - - @Override - public void reconfigure(Map configs) { - + public CompletableFuture execute(List actions) { + return CompletableFuture.completedFuture(null); } }) .build(); @@ -141,6 +127,75 @@ public void reconfigure(Map configs) { } @Test + public void testGroupActions() { + AnomalyDetector anomalyDetector = new AnomalyDetectorBuilder() + .clusterModel(Mockito.mock(ClusterModel.class)) + .addGoal(Mockito.mock(Goal.class)) + .executor(new ActionExecutorService() { + @Override + public void start() { + + } + + @Override + public void shutdown() { + + } + + @Override + public CompletableFuture execute(List actions) { + return CompletableFuture.completedFuture(null); + } + }) + .build(); + + List actions = List.of( + new Action(ActionType.MOVE, new TopicPartition("topic-1", 0), 0, 1), + new Action(ActionType.MOVE, new TopicPartition("topic-1", 1), 0, 2), + new Action(ActionType.MOVE, new TopicPartition("topic-1", 2), 0, 3), + new Action(ActionType.MOVE, new TopicPartition("topic-1", 3), 0, 4), + new Action(ActionType.MOVE, new TopicPartition("topic-2", 0), 1, 5), + new Action(ActionType.MOVE, new TopicPartition("topic-2", 1), 2, 6), + new Action(ActionType.MOVE, new TopicPartition("topic-2", 2), 3, 7), + new Action(ActionType.MOVE, new TopicPartition("topic-2", 3), 4, 8), + new Action(ActionType.MOVE, new TopicPartition("topic-3", 0), 11, 10), + new Action(ActionType.MOVE, new TopicPartition("topic-3", 1), 22, 10), + new Action(ActionType.MOVE, new TopicPartition("topic-3", 2), 33, 10), + new Action(ActionType.MOVE, new TopicPartition("topic-3", 3), 44, 10)); + List> groupedActions = anomalyDetector.checkAndGroupActions(actions, 2, Map.of( + "topic-1", 10, + "topic-2", 5, + "topic-3", 5 + )); + Assertions.assertEquals(4, groupedActions.size()); + List> expectedActions = List.of( + List.of( + new Action(ActionType.MOVE, new TopicPartition("topic-1", 0), 0, 1), + new Action(ActionType.MOVE, new TopicPartition("topic-1", 1), 0, 2) + ), + List.of( + new Action(ActionType.MOVE, new TopicPartition("topic-1", 2), 0, 3), + new Action(ActionType.MOVE, new TopicPartition("topic-1", 3), 0, 4), + new Action(ActionType.MOVE, new TopicPartition("topic-2", 0), 1, 5), + new Action(ActionType.MOVE, new TopicPartition("topic-2", 1), 2, 6), + new Action(ActionType.MOVE, new TopicPartition("topic-2", 2), 3, 7) + ), + List.of( + new Action(ActionType.MOVE, new TopicPartition("topic-2", 3), 4, 8), + new Action(ActionType.MOVE, new TopicPartition("topic-3", 0), 11, 10), + new Action(ActionType.MOVE, new TopicPartition("topic-3", 1), 22, 10) + ), + List.of( + new Action(ActionType.MOVE, new TopicPartition("topic-3", 2), 33, 10), + new Action(ActionType.MOVE, new TopicPartition("topic-3", 3), 44, 10) + ) + ); + + Assertions.assertEquals(expectedActions, groupedActions); + } + + @Test + @Timeout(10) public void testSchedulingTimeCost() { ClusterModel clusterModel = new ClusterModel(); @@ -186,6 +241,8 @@ public void testSchedulingTimeCost() { AnomalyDetector detector = new AnomalyDetectorBuilder() .clusterModel(clusterModel) .detectIntervalMs(Long.MAX_VALUE) + .executionIntervalMs(0) + .executionConcurrency(100) .addGoal(goal0) .addGoal(goal1) .executor(new ActionExecutorService() { @@ -200,23 +257,9 @@ public void shutdown() { } @Override - public void execute(Action action) { - actionList.add(action); - } - - @Override - public void execute(List actions) { + public CompletableFuture execute(List actions) { actionList.addAll(actions); - } - - @Override - public void validateReconfiguration(Map configs) throws ConfigException { - - } - - @Override - public void reconfigure(Map configs) { - + return CompletableFuture.completedFuture(null); } }) .build(); @@ -224,7 +267,7 @@ public void reconfigure(Map configs) { TimerUtil timerUtil = new TimerUtil(); detector.onLeaderChanged(true); detector.run(); - detector.detect0(); + Assertions.assertDoesNotThrow(detector::detect0); System.out.printf("Detect cost: %d ms, %d actions detected%n", timerUtil.elapsedAs(TimeUnit.MILLISECONDS), actionList.size()); Assertions.assertFalse(actionList.isEmpty());