Skip to content

Commit

Permalink
feat(auto_balancer): define required metrics in MetricVersion (#2048)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Oct 9, 2024
1 parent f7ffd88 commit f2681d8
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,70 @@

package kafka.autobalancer.common.types;

import java.util.Collections;
import java.util.Set;
import kafka.autobalancer.goals.Goal;
import kafka.autobalancer.goals.NetworkInUsageDistributionGoal;
import kafka.autobalancer.goals.NetworkOutUsageDistributionGoal;

import java.util.Objects;

import static kafka.autobalancer.common.types.RawMetricTypes.BROKER_APPEND_LATENCY_AVG_MS;
import static kafka.autobalancer.common.types.RawMetricTypes.BROKER_MAX_PENDING_APPEND_LATENCY_MS;
import static kafka.autobalancer.common.types.RawMetricTypes.BROKER_MAX_PENDING_FETCH_LATENCY_MS;
import static kafka.autobalancer.common.types.RawMetricTypes.BROKER_METRIC_VERSION;
import static kafka.autobalancer.common.types.RawMetricTypes.PARTITION_BYTES_IN;
import static kafka.autobalancer.common.types.RawMetricTypes.PARTITION_BYTES_OUT;
import static kafka.autobalancer.common.types.RawMetricTypes.PARTITION_SIZE;

public class MetricVersion {
public static final MetricVersion V0 = new MetricVersion((short) 0);
public static final MetricVersion V1 = new MetricVersion((short) 1);
public static final MetricVersion LATEST_VERSION = V1;
public static final MetricVersion V0 = new MetricVersion((short) 0,
Collections.emptySet(),
Set.of(PARTITION_BYTES_IN, PARTITION_BYTES_OUT, PARTITION_SIZE));
public static final MetricVersion V1 = new MetricVersion((short) 1,
Set.of(BROKER_APPEND_LATENCY_AVG_MS, BROKER_MAX_PENDING_APPEND_LATENCY_MS, BROKER_MAX_PENDING_FETCH_LATENCY_MS, BROKER_METRIC_VERSION),
Set.of(PARTITION_BYTES_IN, PARTITION_BYTES_OUT, PARTITION_SIZE)
);
public static final MetricVersion V2 = new MetricVersion((short) 2,
Set.of(BROKER_APPEND_LATENCY_AVG_MS, BROKER_METRIC_VERSION),
Set.of(PARTITION_BYTES_IN, PARTITION_BYTES_OUT, PARTITION_SIZE)
);
public static final MetricVersion LATEST_VERSION = V2;
private final short value;
private final Set<Byte> requiredBrokerMetrics;
private final Set<Byte> requiredPartitionMetrics;

public MetricVersion(short version) {
public MetricVersion(short version, Set<Byte> requiredBrokerMetrics, Set<Byte> requiredPartitionMetrics) {
this.value = version;
this.requiredBrokerMetrics = requiredBrokerMetrics;
this.requiredPartitionMetrics = requiredPartitionMetrics;
}

public static MetricVersion of(short version) {
switch (version) {
case 0:
return V0;
case 1:
return V1;
case 2:
return V2;
default:
throw new IllegalArgumentException("Unknown metric version: " + version);
}
}

public short value() {
return value;
}

public Set<Byte> requiredBrokerMetrics() {
return requiredBrokerMetrics;
}

public Set<Byte> requiredPartitionMetrics() {
return requiredPartitionMetrics;
}

public boolean isSlowBrokerSupported() {
return isAfter(V0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@

package kafka.autobalancer.common.types;

import java.util.Map;
import kafka.autobalancer.common.types.metrics.AbnormalLatency;
import kafka.autobalancer.common.types.metrics.AbnormalMetric;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

public class RawMetricTypes {
public static final byte PARTITION_BYTES_IN = (byte) 0;
public static final byte PARTITION_BYTES_OUT = (byte) 1;
Expand All @@ -26,27 +23,12 @@ public class RawMetricTypes {
public static final byte BROKER_MAX_PENDING_APPEND_LATENCY_MS = (byte) 4;
public static final byte BROKER_MAX_PENDING_FETCH_LATENCY_MS = (byte) 5;
public static final byte BROKER_METRIC_VERSION = (byte) 6;
public static final Set<Byte> PARTITION_METRICS = Set.of(PARTITION_BYTES_IN, PARTITION_BYTES_OUT, PARTITION_SIZE);
public static final Set<Byte> BROKER_METRICS = Set.of(BROKER_APPEND_LATENCY_AVG_MS,
BROKER_MAX_PENDING_APPEND_LATENCY_MS, BROKER_MAX_PENDING_FETCH_LATENCY_MS, BROKER_METRIC_VERSION);
public static final Map<Byte, AbnormalMetric> ABNORMAL_METRICS = Map.of(
BROKER_APPEND_LATENCY_AVG_MS, new AbnormalLatency(100), // 100ms
BROKER_MAX_PENDING_APPEND_LATENCY_MS, new AbnormalLatency(10000), // 10s
BROKER_MAX_PENDING_FETCH_LATENCY_MS, new AbnormalLatency(10000) // 10s
);

public static Set<Byte> requiredPartitionMetrics(MetricVersion metricVersion) {
// same partition metrics requirement for ALL metric versions
return PARTITION_METRICS;
}

public static Set<Byte> requiredBrokerMetrics(MetricVersion metricVersion) {
if (metricVersion.isAfter(MetricVersion.V0)) {
return BROKER_METRICS;
}
return Collections.emptySet();
}

public static AbnormalMetric ofAbnormalType(byte metricType) {
return ABNORMAL_METRICS.get(metricType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ protected void processMetrics(YammerMetricProcessor.Context context) throws Exce
protected void processBrokerMetrics(YammerMetricProcessor.Context context) {
context.merge(new BrokerMetrics(context.time(), brokerId, brokerRack)
.put(RawMetricTypes.BROKER_METRIC_VERSION, MetricVersion.LATEST_VERSION.value())
// TODO: fix latency calculation
.put(RawMetricTypes.BROKER_APPEND_LATENCY_AVG_MS,
TimeUnit.NANOSECONDS.toMillis((long) appendLatencyAvg.derive(
StreamOperationStats.getInstance().appendStreamLatency.sum(),
Expand Down Expand Up @@ -419,7 +420,7 @@ protected void processYammerMetrics(YammerMetricProcessor.Context context) throw
protected void addMissingMetrics(YammerMetricProcessor.Context context) {
for (AutoBalancerMetrics metrics : context.getMetricMap().values()) {
if (metrics.metricType() == MetricTypes.TOPIC_PARTITION_METRIC) {
for (byte key : RawMetricTypes.requiredPartitionMetrics(MetricVersion.LATEST_VERSION)) {
for (byte key : MetricVersion.LATEST_VERSION.requiredPartitionMetrics()) {
metrics.getMetricValueMap().putIfAbsent(key, 0.0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,11 @@ private static AutoBalancerMetrics bytesOutToMetric(String topic, int partition,
}

public static boolean sanityCheckTopicPartitionMetricsCompleteness(AutoBalancerMetrics metrics) {
return metrics.getMetricValueMap().keySet().containsAll(RawMetricTypes.requiredPartitionMetrics(MetricVersion.LATEST_VERSION));
return metrics.getMetricValueMap().keySet().containsAll(MetricVersion.LATEST_VERSION.requiredPartitionMetrics());
}

public static boolean sanityCheckBrokerMetricsCompleteness(AutoBalancerMetrics metrics) {
return metrics.getMetricValueMap().keySet().containsAll(RawMetricTypes.requiredBrokerMetrics(MetricVersion.LATEST_VERSION));
return metrics.getMetricValueMap().keySet().containsAll(MetricVersion.LATEST_VERSION.requiredBrokerMetrics());
}

public static String topicPartitionKey(String topic, int partition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void setActive(boolean active) {
@Override
protected boolean processMetric(byte metricType, double value) {
if (metricType == RawMetricTypes.BROKER_METRIC_VERSION) {
this.metricVersion = new MetricVersion((short) value);
this.metricVersion = MetricVersion.of((short) value);
}
return true;
}
Expand Down Expand Up @@ -98,7 +98,7 @@ protected AbstractInstance createInstance(boolean metricsOutOfDate) {

@Override
protected Set<Byte> requiredMetrics() {
return RawMetricTypes.requiredBrokerMetrics(metricVersion);
return metricVersion.requiredBrokerMetrics();
}

protected Map<Byte, Snapshot> getMetricsSnapshot() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected AbstractInstance createInstance(boolean metricsOutOfDate) {

@Override
protected Set<Byte> requiredMetrics() {
return RawMetricTypes.requiredPartitionMetrics(metricVersion);
return metricVersion.requiredPartitionMetrics();
}

public void setMetricVersion(MetricVersion metricVersion) {
Expand Down

0 comments on commit f2681d8

Please sign in to comment.