Skip to content

Commit

Permalink
replace ratelimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
Andyz26 committed Aug 29, 2023
1 parent 23bd8a2 commit 260b93c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
import io.mantisrx.server.master.resourcecluster.TaskExecutorStatusChange;
import io.mantisrx.shaded.com.google.common.util.concurrent.RateLimiter;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

Expand All @@ -45,7 +47,8 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway {
private final Counter disconnectionCounter;
private final Counter throttledCounter;

protected final RateLimiter rateLimiter;
private final Semaphore semaphore;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

ResourceClusterGatewayAkkaImpl(
ActorRef resourceClusterManagerActor,
Expand All @@ -56,7 +59,12 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway {
this.askTimeout = askTimeout;
this.mapper = mapper;

this.rateLimiter = RateLimiter.create(maxConcurrentRequestCount);
this.semaphore = new Semaphore(maxConcurrentRequestCount);
scheduler.scheduleAtFixedRate(() -> {
semaphore.drainPermits();
semaphore.release(maxConcurrentRequestCount);
}, 1, 1, TimeUnit.SECONDS);

Metrics m = new Metrics.Builder()
.id("ResourceClusterGatewayAkkaImpl")
.addCounter("registrationCounter")
Expand All @@ -73,7 +81,7 @@ class ResourceClusterGatewayAkkaImpl implements ResourceClusterGateway {

private <In, Out> Function<In, CompletableFuture<Out>> withThrottle(Function<In, CompletableFuture<Out>> func) {
return in -> {
if (rateLimiter.tryAcquire(200, TimeUnit.MILLISECONDS)) {
if (semaphore.tryAcquire()) {
return func.apply(in);
} else {
this.throttledCounter.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ default Duration getSchedulerIntervalBetweenRetries() {

// rate limit actions on resource cluster actor to control backlog.
@Config("mantis.master.resource.cluster.actions.permitsPerSecond")
@Default("300")
@Default("800")
int getResourceClusterActionsPermitsPerSecond();

default Duration getHeartbeatInterval() {
Expand Down

0 comments on commit 260b93c

Please sign in to comment.