Skip to content

Commit

Permalink
feat(auto_balancer): record network metrics by throttle priority (#1093)
Browse files Browse the repository at this point in the history
* feat(auto_balancer): record network metrics by throttle priority

Signed-off-by: Shichao Nie <[email protected]>

* feat(auto_balancer): give throttle strategy meaningful name

Signed-off-by: Shichao Nie <[email protected]>

---------

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Apr 7, 2024
1 parent 8e61929 commit 6cb92bf
Show file tree
Hide file tree
Showing 15 changed files with 482 additions and 148 deletions.
447 changes: 374 additions & 73 deletions docker/telemetry/grafana/provisioning/dashboards/Developer/detailed.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public CompletableFuture<FindIndexResult> find(long streamId, long startOffset,
}

public CompletableFuture<DataBlockGroup> read(DataBlockIndex block) {
CompletableFuture<ByteBuf> rangeReadCf = s3Operator.rangeRead(objectKey, block.startPosition(), block.endPosition(), ThrottleStrategy.THROTTLE_1);
CompletableFuture<ByteBuf> rangeReadCf = s3Operator.rangeRead(objectKey, block.startPosition(), block.endPosition(), ThrottleStrategy.CATCH_UP);
return rangeReadCf.thenApply(DataBlockGroup::new);
}

Expand Down
2 changes: 1 addition & 1 deletion s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public CompletableFuture<FetchResult> fetch(FetchContext context,
if (context.readOptions().fastRead()) {
networkOutboundLimiter.forceConsume(totalSize);
} else {
return networkOutboundLimiter.consume(ThrottleStrategy.THROTTLE_1, totalSize).thenApply(nil -> rs);
return networkOutboundLimiter.consume(ThrottleStrategy.CATCH_UP, totalSize).thenApply(nil -> rs);
}
}
return CompletableFuture.completedFuture(rs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public Optional<CompactStreamObjectRequest> compact() throws ExecutionException,
long compactedEndOffset = objectGroup.get(objectGroup.size() - 1).endOffset();
List<Long> compactedObjectIds = new LinkedList<>();
CompositeByteBuf indexes = ByteBufAlloc.compositeByteBuffer();
Writer writer = s3Operator.writer(new Writer.Context(STREAM_OBJECT_COMPACTION_READ), ObjectUtils.genKey(0, objectId), ThrottleStrategy.THROTTLE_2);
Writer writer = s3Operator.writer(new Writer.Context(STREAM_OBJECT_COMPACTION_READ), ObjectUtils.genKey(0, objectId), ThrottleStrategy.COMPACTION);
long groupStartOffset = -1L;
long groupStartPosition = -1L;
int groupSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private CompletableFuture<ByteBuf> rangeRead(long start, long end) {

private CompletableFuture<ByteBuf> rangeRead0(long start, long end) {
if (throttleBucket == null) {
return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> {
return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.COMPACTION).thenApply(buf -> {
// convert heap buffer to direct buffer
ByteBuf directBuf = DIRECT_ALLOC.byteBuffer(buf.readableBytes());
directBuf.writeBytes(buf);
Expand All @@ -205,7 +205,7 @@ private CompletableFuture<ByteBuf> rangeRead0(long start, long end) {
} else {
return throttleBucket.asScheduler().consume(end - start + 1, bucketCallbackExecutor)
.thenCompose(v ->
s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> {
s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.COMPACTION).thenApply(buf -> {
// convert heap buffer to direct buffer
ByteBuf directBuf = DIRECT_ALLOC.byteBuffer(buf.readableBytes());
directBuf.writeBytes(buf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public DataBlockWriter(long objectId, S3Operator s3Operator, int partSizeThresho
waitingUploadBlocks = new LinkedList<>();
waitingUploadBlockCfs = new ConcurrentHashMap<>();
completedBlocks = new LinkedList<>();
writer = s3Operator.writer(new Writer.Context(STREAM_SET_OBJECT_COMPACTION_READ), objectKey, ThrottleStrategy.THROTTLE_2);
writer = s3Operator.writer(new Writer.Context(STREAM_SET_OBJECT_COMPACTION_READ), objectKey, ThrottleStrategy.COMPACTION);
}

public long getObjectId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.automq.stream.s3.metrics.operations.S3ObjectStage;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.operations.S3Stage;
import com.automq.stream.s3.network.ThrottleStrategy;
import io.opentelemetry.api.common.Attributes;

public class AttributesUtils {
Expand Down Expand Up @@ -44,6 +45,12 @@ public static Attributes buildAttributes(String status) {
.build();
}

public static Attributes buildAttributes(ThrottleStrategy strategy) {
return Attributes.builder()
.put(S3StreamMetricsConstant.LABEL_TYPE, strategy.getName())
.build();
}

public static Attributes buildStatusStageAttributes(String status, String stage) {
return Attributes.builder()
.put(S3StreamMetricsConstant.LABEL_STATUS, status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public class S3StreamMetricsConstant {
public static final AttributeKey<String> LABEL_SIZE_NAME = AttributeKey.stringKey("size");
public static final AttributeKey<String> LABEL_STAGE = AttributeKey.stringKey("stage");
public static final AttributeKey<String> LABEL_STATUS = AttributeKey.stringKey("status");
public static final AttributeKey<String> LABEL_ALLOC_TYPE = AttributeKey.stringKey("type");
public static final AttributeKey<String> LABEL_TYPE = AttributeKey.stringKey("type");
public static final AttributeKey<String> LABEL_INDEX = AttributeKey.stringKey("index");
public static final String LABEL_STATUS_SUCCESS = "success";
public static final String LABEL_STATUS_FAILED = "failed";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.automq.stream.s3.metrics.wrapper.HistogramInstrument;
import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import io.opentelemetry.api.common.Attributes;
Expand Down Expand Up @@ -91,7 +92,7 @@ public class S3StreamMetricsManager {
private static Supplier<Integer> inflightWALUploadTasksCountSupplier = () -> 0;
private static MetricsConfig metricsConfig = new MetricsConfig(MetricsLevel.INFO, Attributes.empty());
private static final MultiAttributes<String> ALLOC_TYPE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
S3StreamMetricsConstant.LABEL_ALLOC_TYPE);
S3StreamMetricsConstant.LABEL_TYPE);
private static final MultiAttributes<String> OPERATOR_INDEX_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
S3StreamMetricsConstant.LABEL_INDEX);

Expand Down Expand Up @@ -431,34 +432,40 @@ public static YammerHistogramMetric buildObjectUploadSizeMetric(MetricName metri
}
}

public static CounterMetric buildNetworkInboundUsageMetric() {
public static CounterMetric buildNetworkInboundUsageMetric(ThrottleStrategy strategy) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
CounterMetric metric = new CounterMetric(metricsConfig, networkInboundUsageInTotal);
CounterMetric metric = new CounterMetric(metricsConfig, AttributesUtils.buildAttributes(strategy), networkInboundUsageInTotal);
BASE_ATTRIBUTES_LISTENERS.add(metric);
return metric;
}
}

public static CounterMetric buildNetworkOutboundUsageMetric() {
public static CounterMetric buildNetworkOutboundUsageMetric(ThrottleStrategy strategy) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
CounterMetric metric = new CounterMetric(metricsConfig, networkOutboundUsageInTotal);
CounterMetric metric = new CounterMetric(metricsConfig, AttributesUtils.buildAttributes(strategy), networkOutboundUsageInTotal);
BASE_ATTRIBUTES_LISTENERS.add(metric);
return metric;
}
}

public static YammerHistogramMetric buildNetworkInboundLimiterQueueTimeMetric(MetricName metricName, MetricsLevel metricsLevel) {
public static YammerHistogramMetric buildNetworkInboundLimiterQueueTimeMetric(MetricName metricName,
MetricsLevel metricsLevel,
ThrottleStrategy strategy) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig);
YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig,
AttributesUtils.buildAttributes(strategy));
BASE_ATTRIBUTES_LISTENERS.add(metric);
NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRICS.add(metric);
return metric;
}
}

public static YammerHistogramMetric buildNetworkOutboundLimiterQueueTimeMetric(MetricName metricName, MetricsLevel metricsLevel) {
public static YammerHistogramMetric buildNetworkOutboundLimiterQueueTimeMetric(MetricName metricName,
MetricsLevel metricsLevel,
ThrottleStrategy strategy) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig);
YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig,
AttributesUtils.buildAttributes(strategy));
BASE_ATTRIBUTES_LISTENERS.add(metric);
NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRICS.add(metric);
return metric;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@
import com.automq.stream.s3.metrics.wrapper.CounterMetric;
import com.automq.stream.s3.metrics.wrapper.YammerHistogramMetric;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.yammer.metrics.core.MetricName;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class NetworkStats {
private volatile static NetworkStats instance = null;

private final CounterMetric networkInboundUsageStats = S3StreamMetricsManager.buildNetworkInboundUsageMetric();
private final CounterMetric networkOutboundUsageStats = S3StreamMetricsManager.buildNetworkOutboundUsageMetric();
private final YammerHistogramMetric networkInboundLimiterQueueTimeStats = S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(
new MetricName(NetworkStats.class, "NetworkInboundLimiterQueueTime"), MetricsLevel.INFO);
private final YammerHistogramMetric networkOutboundLimiterQueueTimeStats = S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(
new MetricName(NetworkStats.class, "NetworkOutboundLimiterQueueTime"), MetricsLevel.INFO);
private final Map<ThrottleStrategy, CounterMetric> networkInboundUsageStats = new ConcurrentHashMap<>();
private final Map<ThrottleStrategy, CounterMetric> networkOutboundUsageStats = new ConcurrentHashMap<>();
private final Map<ThrottleStrategy, YammerHistogramMetric> networkInboundLimiterQueueTimeStatsMap = new ConcurrentHashMap<>();
private final Map<ThrottleStrategy, YammerHistogramMetric> networkOutboundLimiterQueueTimeStatsMap = new ConcurrentHashMap<>();

private NetworkStats() {
}
Expand All @@ -42,11 +44,19 @@ public static NetworkStats getInstance() {
return instance;
}

public CounterMetric networkUsageStats(AsyncNetworkBandwidthLimiter.Type type) {
return type == AsyncNetworkBandwidthLimiter.Type.INBOUND ? networkInboundUsageStats : networkOutboundUsageStats;
public CounterMetric networkUsageStats(AsyncNetworkBandwidthLimiter.Type type, ThrottleStrategy strategy) {
return type == AsyncNetworkBandwidthLimiter.Type.INBOUND
? networkInboundUsageStats.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundUsageMetric(strategy))
: networkOutboundUsageStats.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundUsageMetric(strategy));
}

public YammerHistogramMetric networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type type) {
return type == AsyncNetworkBandwidthLimiter.Type.INBOUND ? networkInboundLimiterQueueTimeStats : networkOutboundLimiterQueueTimeStats;
public YammerHistogramMetric networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type type, ThrottleStrategy strategy) {
return type == AsyncNetworkBandwidthLimiter.Type.INBOUND
? networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(
new MetricName(NetworkStats.class, "NetworkInboundLimiterQueueTime-" + strategy.getName()),
MetricsLevel.INFO, strategy))
: networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(
new MetricName(NetworkStats.class, "NetworkOutboundLimiterQueueTime-" + strategy.getName()),
MetricsLevel.INFO, strategy));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public AsyncNetworkBandwidthLimiter(Type type, long tokenSize, int refillInterva
BucketItem head = queuedCallbacks.poll();
reduceToken(head.size);
extraTokens = Math.max(0, extraTokens - head.size);
logMetrics(head.size);
logMetrics(head.size, head.strategy);
head.cf.complete(null);
}
} catch (InterruptedException ignored) {
Expand Down Expand Up @@ -126,7 +126,7 @@ public void forceConsume(long size) {
lock.lock();
try {
reduceToken(size);
logMetrics(size);
logMetrics(size, ThrottleStrategy.BYPASS);
} finally {
lock.unlock();
}
Expand All @@ -138,25 +138,19 @@ public CompletableFuture<Void> consume(ThrottleStrategy throttleStrategy, long s
forceConsume(size);
cf.complete(null);
} else {
cf = consume(throttleStrategy.priority(), size);
}
return cf;
}

private CompletableFuture<Void> consume(int priority, long size) {
CompletableFuture<Void> cf = new CompletableFuture<>();
lock.lock();
try {
if (availableTokens < 0 || !queuedCallbacks.isEmpty()) {
queuedCallbacks.add(new BucketItem(priority, size, cf));
condition.signalAll();
} else {
reduceToken(size);
cf.complete(null);
logMetrics(size);
lock.lock();
try {
if (availableTokens < 0 || !queuedCallbacks.isEmpty()) {
queuedCallbacks.add(new BucketItem(throttleStrategy, size, cf));
condition.signalAll();
} else {
reduceToken(size);
cf.complete(null);
logMetrics(size, throttleStrategy);
}
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
return cf;
}
Expand All @@ -165,8 +159,8 @@ private void reduceToken(long size) {
this.availableTokens = Math.max(-maxTokens, availableTokens - size);
}

private void logMetrics(long size) {
NetworkStats.getInstance().networkUsageStats(type).add(MetricsLevel.INFO, size);
private void logMetrics(long size, ThrottleStrategy strategy) {
NetworkStats.getInstance().networkUsageStats(type, strategy).add(MetricsLevel.INFO, size);
}

public enum Type {
Expand All @@ -185,27 +179,19 @@ public String getName() {
}

static final class BucketItem implements Comparable<BucketItem> {
private final int priority;
private final ThrottleStrategy strategy;
private final long size;
private final CompletableFuture<Void> cf;

BucketItem(int priority, long size, CompletableFuture<Void> cf) {
this.priority = priority;
BucketItem(ThrottleStrategy strategy, long size, CompletableFuture<Void> cf) {
this.strategy = strategy;
this.size = size;
this.cf = cf;
}

@Override
public int compareTo(BucketItem o) {
return Long.compare(priority, o.priority);
}

public int priority() {
return priority;
}

public long size() {
return size;
return Long.compare(strategy.priority(), o.strategy.priority());
}

public CompletableFuture<Void> cf() {
Expand All @@ -219,20 +205,20 @@ public boolean equals(Object obj) {
if (obj == null || obj.getClass() != this.getClass())
return false;
var that = (BucketItem) obj;
return this.priority == that.priority &&
return this.strategy == that.strategy &&
this.size == that.size &&
Objects.equals(this.cf, that.cf);
}

@Override
public int hashCode() {
return Objects.hash(priority, size, cf);
return Objects.hash(strategy, size, cf);
}

@Override
public String toString() {
return "BucketItem[" +
"priority=" + priority + ", " +
"throttleStrategy=" + strategy + ", " +
"size=" + size + ", " +
"cf=" + cf + ']';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,23 @@
package com.automq.stream.s3.network;

public enum ThrottleStrategy {
BYPASS(0),
THROTTLE_1(1),
THROTTLE_2(2);
BYPASS(0, "bypass"),
CATCH_UP(1, "catchup"),
COMPACTION(2, "compaction");

private final int priority;
private final String name;

ThrottleStrategy(int priority) {
ThrottleStrategy(int priority, String name) {
this.priority = priority;
this.name = name;
}

public int priority() {
return priority;
}

public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public CompletableFuture<ByteBuf> rangeRead(String path, long start, long end, T
if (networkInboundBandwidthLimiter != null) {
TimerUtil timerUtil = new TimerUtil();
networkInboundBandwidthLimiter.consume(throttleStrategy, end - start).whenCompleteAsync((v, ex) -> {
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.INBOUND)
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.INBOUND, throttleStrategy)
.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
cf.completeExceptionally(ex);
Expand Down Expand Up @@ -384,7 +384,7 @@ public CompletableFuture<Void> write(String path, ByteBuf data, ThrottleStrategy
if (networkOutboundBandwidthLimiter != null) {
TimerUtil timerUtil = new TimerUtil();
networkOutboundBandwidthLimiter.consume(throttleStrategy, data.readableBytes()).whenCompleteAsync((v, ex) -> {
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND)
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, throttleStrategy)
.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
cf.completeExceptionally(ex);
Expand Down
Loading

0 comments on commit 6cb92bf

Please sign in to comment.