From 260b93cdc9feebcabc6a1312e5f2b7cf9dfa0783 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Mon, 28 Aug 2023 20:04:28 -0700 Subject: [PATCH] replace ratelimiter --- .../ResourceClusterGatewayAkkaImpl.java | 16 ++++++++++++---- .../master/config/MasterConfiguration.java | 2 +- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterGatewayAkkaImpl.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterGatewayAkkaImpl.java index 3a04e1200..14a8328f4 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterGatewayAkkaImpl.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterGatewayAkkaImpl.java @@ -30,9 +30,11 @@ import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat; import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration; import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange; -import io.mantisrx.shaded.com.google.common.util.concurrent.RateLimiter; import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -45,7 +47,8 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway { private final Counter disconnectionCounter; private final Counter throttledCounter; - protected final RateLimiter rateLimiter; + private final Semaphore semaphore; + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); ResourceClusterGatewayAkkaImpl( ActorRef resourceClusterManagerActor, @@ -56,7 +59,12 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway { this.askTimeout = askTimeout; this.mapper = mapper; - this.rateLimiter = RateLimiter.create(maxConcurrentRequestCount); + this.semaphore = new Semaphore(maxConcurrentRequestCount); + scheduler.scheduleAtFixedRate(() -> { + semaphore.drainPermits(); + semaphore.release(maxConcurrentRequestCount); + }, 1, 1, TimeUnit.SECONDS); + Metrics m = new Metrics.Builder() .id("ResourceClusterGatewayAkkaImpl") .addCounter("registrationCounter") @@ -73,7 +81,7 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway { private Function> withThrottle(Function> func) { return in -> { - if (rateLimiter.tryAcquire(200, TimeUnit.MILLISECONDS)) { + if (semaphore.tryAcquire()) { return func.apply(in); } else { this.throttledCounter.increment(); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java index ce8042268..9e04aac36 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/config/MasterConfiguration.java @@ -380,7 +380,7 @@ default Duration getSchedulerIntervalBetweenRetries() { // rate limit actions on resource cluster actor to control backlog. @Config("mantis.master.resource.cluster.actions.permitsPerSecond") - @Default("300") + @Default("800") int getResourceClusterActionsPermitsPerSecond(); default Duration getHeartbeatInterval() {