Skip to content

Commit

Permalink
feat(throttle): subtract extra token from total available token (#1163)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Apr 24, 2024
1 parent e81a0bd commit c7370bd
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 37 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ public DefaultS3Client(BrokerServer brokerServer, KafkaConfig kafkaConfig) {
String region = kafkaConfig.s3Region();
String bucket = kafkaConfig.s3Bucket();
networkInboundLimiter = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND,
config.networkBaselineBandwidth(), config.refillPeriodMs(), config.networkBaselineBandwidth());
config.networkBaselineBandwidth(), config.refillPeriodMs());
networkOutboundLimiter = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.OUTBOUND,
config.networkBaselineBandwidth(), config.refillPeriodMs(), config.networkBaselineBandwidth());
config.networkBaselineBandwidth(), config.refillPeriodMs());
List<AwsCredentialsProvider> credentialsProviders = List.of(CredentialsProviderHolder.getAwsCredentialsProvider(), EnvVariableCredentialsProvider.get());
boolean forcePathStyle = this.config.forcePathStyle();
S3Operator s3Operator = DefaultS3Operator.builder().endpoint(endpoint).region(region).bucket(bucket).credentialsProviders(credentialsProviders).tagging(config.objectTagging())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,31 @@ public class AsyncNetworkBandwidthLimiter {
private final ExecutorService callbackThreadPool;
private final Queue<BucketItem> queuedCallbacks;
private final Type type;
private final long tokenSize;
private final long extraTokenSize;
private long availableTokens;
private long extraTokens;
private long availableExtraTokens;

public AsyncNetworkBandwidthLimiter(Type type, long tokenSize, int refillIntervalMs, long maxTokenSize) {
public AsyncNetworkBandwidthLimiter(Type type, long tokenSize, int refillIntervalMs) {
this.type = type;
this.availableTokens = tokenSize;
this.maxTokens = maxTokenSize;
this.extraTokenSize = (long) (tokenSize * DEFAULT_EXTRA_TOKEN_RATIO);
this.tokenSize = tokenSize - extraTokenSize;
this.availableTokens = this.tokenSize;
this.maxTokens = tokenSize;
this.queuedCallbacks = new PriorityQueue<>();
this.refillThreadPool = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("refill-bucket-thread"));
this.callbackThreadPool = Executors.newFixedThreadPool(1, new DefaultThreadFactory("callback-thread"));
this.callbackThreadPool.execute(() -> {
while (true) {
lock.lock();
try {
while (queuedCallbacks.isEmpty() || (availableTokens <= 0 && extraTokens <= 0)) {
while (queuedCallbacks.isEmpty() || (availableTokens <= 0 && availableExtraTokens <= 0)) {
condition.await();
}
while (!queuedCallbacks.isEmpty() && (availableTokens > 0 || extraTokens > 0)) {
while (!queuedCallbacks.isEmpty() && (availableTokens > 0 || availableExtraTokens > 0)) {
BucketItem head = queuedCallbacks.poll();
reduceToken(head.size);
extraTokens = Math.max(0, extraTokens - head.size);
availableExtraTokens = Math.max(0, availableExtraTokens - head.size);
logMetrics(head.size, head.strategy);
head.cf.complete(null);
}
Expand All @@ -70,13 +74,13 @@ public AsyncNetworkBandwidthLimiter(Type type, long tokenSize, int refillInterva
this.refillThreadPool.scheduleAtFixedRate(() -> {
lock.lock();
try {
availableTokens = Math.min(availableTokens + tokenSize, maxTokenSize);
availableTokens = Math.min(availableTokens + this.tokenSize, this.tokenSize);
if (availableTokens <= 0) {
// provide extra tokens to prevent starvation when available tokens are exhausted
extraTokens = (long) (tokenSize * DEFAULT_EXTRA_TOKEN_RATIO);
availableExtraTokens = this.extraTokenSize;
} else {
// disable extra tokens when available tokens are sufficient
extraTokens = 0;
availableExtraTokens = 0;
}
condition.signalAll();
} finally {
Expand Down Expand Up @@ -104,10 +108,10 @@ public long getAvailableTokens() {
}
}

public long getExtraTokens() {
public long getAvailableExtraTokens() {
lock.lock();
try {
return extraTokens;
return availableExtraTokens;
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -156,7 +160,7 @@ public CompletableFuture<Void> consume(ThrottleStrategy throttleStrategy, long s
}

private void reduceToken(long size) {
this.availableTokens = Math.max(-maxTokens, availableTokens - size);
this.availableTokens = Math.max(-tokenSize, availableTokens - size);
}

private void logMetrics(long size, ThrottleStrategy strategy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,86 +23,87 @@ public class AsyncNetworkBandwidthLimiterTest {

@Test
public void testByPassConsume() {
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 10, 5000, 100);
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 10, 5000);
CompletableFuture<Void> cf = bucket.consume(ThrottleStrategy.BYPASS, 1);
Assertions.assertEquals(9, bucket.getAvailableTokens());
Assertions.assertEquals(8, bucket.getAvailableTokens());
Assertions.assertTrue(cf.isDone());
}

@Test
public void testByPassConsume2() {
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 10, 1000, 100);
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 10, 1000);
CompletableFuture<Void> cf = bucket.consume(ThrottleStrategy.BYPASS, 20);
Assertions.assertEquals(-10, bucket.getAvailableTokens());
Assertions.assertEquals(-9, bucket.getAvailableTokens());
cf.whenComplete((v, e) -> {
Assertions.assertNull(e);
Assertions.assertEquals(-10, bucket.getAvailableTokens());
Assertions.assertEquals(-9, bucket.getAvailableTokens());
});
cf.join();
}

@Test
public void testThrottleConsume() {
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 10, 1000, 100);
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 10, 1000);
CompletableFuture<Void> cf = bucket.consume(ThrottleStrategy.CATCH_UP, 1);
Assertions.assertEquals(9, bucket.getAvailableTokens());
Assertions.assertEquals(8, bucket.getAvailableTokens());
Assertions.assertTrue(cf.isDone());
}

@Test
public void testThrottleConsume2() {
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 10, 1000, 100);
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 10, 1000);
CompletableFuture<Void> cf = bucket.consume(ThrottleStrategy.CATCH_UP, 20);
Assertions.assertEquals(-10, bucket.getAvailableTokens());
Assertions.assertEquals(-9, bucket.getAvailableTokens());
cf.whenComplete((v, e) -> {
Assertions.assertNull(e);
Assertions.assertEquals(-10, bucket.getAvailableTokens());
Assertions.assertEquals(-9, bucket.getAvailableTokens());
});
cf.join();
}

@Test
public void testThrottleConsume3() {
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 10, 1000, 100);
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 10, 1000);
CompletableFuture<Void> cf = bucket.consume(ThrottleStrategy.BYPASS, 20);
Assertions.assertEquals(-10, bucket.getAvailableTokens());
Assertions.assertEquals(-9, bucket.getAvailableTokens());
Assertions.assertTrue(cf.isDone());
cf = bucket.consume(ThrottleStrategy.CATCH_UP, 10);
Assertions.assertEquals(-10, bucket.getAvailableTokens());
Assertions.assertEquals(-9, bucket.getAvailableTokens());
cf.whenComplete((v, e) -> {
Assertions.assertNull(e);
Assertions.assertEquals(0, bucket.getAvailableTokens());
Assertions.assertEquals(-9, bucket.getAvailableTokens());
Assertions.assertEquals(0, bucket.getAvailableExtraTokens());
});
cf.join();
}

@Test
public void testThrottleConsume4() {
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 100, 1000, 100);
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 100, 1000);
bucket.consume(ThrottleStrategy.BYPASS, 1000);
Assertions.assertEquals(-100, bucket.getAvailableTokens());
Assertions.assertEquals(-90, bucket.getAvailableTokens());
CompletableFuture<Void> cf = bucket.consume(ThrottleStrategy.CATCH_UP, 5);
bucket.consume(ThrottleStrategy.CATCH_UP, 10);
cf.whenComplete((v, e) -> {
Assertions.assertNull(e);
Assertions.assertEquals(0, bucket.getAvailableTokens());
Assertions.assertEquals(5, bucket.getExtraTokens());
Assertions.assertEquals(-5, bucket.getAvailableTokens());
Assertions.assertEquals(5, bucket.getAvailableExtraTokens());
});
cf.join();
}

@Test
public void testThrottleConsumeWithPriority() {
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 100, 1000, 100);
AsyncNetworkBandwidthLimiter bucket = new AsyncNetworkBandwidthLimiter(AsyncNetworkBandwidthLimiter.Type.INBOUND, 100, 1000);
bucket.consume(ThrottleStrategy.BYPASS, 1000);
Assertions.assertEquals(-100, bucket.getAvailableTokens());
Assertions.assertEquals(-90, bucket.getAvailableTokens());
bucket.consume(ThrottleStrategy.CATCH_UP, 10);
CompletableFuture<Void> cf = bucket.consume(ThrottleStrategy.COMPACTION, 100);
CompletableFuture<Void> cf2 = bucket.consume(ThrottleStrategy.CATCH_UP, 100);
cf2.whenComplete((v, e) -> {
Assertions.assertNull(e);
Assertions.assertEquals(-100, bucket.getAvailableTokens());
Assertions.assertEquals(0, bucket.getExtraTokens());
Assertions.assertEquals(-10, bucket.getAvailableTokens());
Assertions.assertEquals(0, bucket.getAvailableExtraTokens());
Assertions.assertFalse(cf.isDone());
});
cf.join();
Expand Down

0 comments on commit c7370bd

Please sign in to comment.