Skip to content

Commit

Permalink
App_Resiliency_Changes. (#247)
Browse files Browse the repository at this point in the history
* App_Resiliency_Changes.

* Calculate limit one time,TooManyTasks update, renaming .

* Concurrency Changes .

* Concurrency Changes.

* Comment principal name logic.

* Concurrency Issue Optimistic Locking & Init Changes.

* Fix internal task issue.

* maxPctParallelRequestsPerActor to 25.

* Review Comments Fix.

* Correct Function Description.

* Long to long.

* Fix QueryTask Add Context.

* Review Comments Code Fix.
  • Loading branch information
rlakde authored May 17, 2024
1 parent 320da5c commit 5670771
Show file tree
Hide file tree
Showing 15 changed files with 350 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
"type": "Config",
"httpPort": 7080,
"requestBodyLimit": 25,
"maxParallelRequestsPerCPU": 30,
"maxPctParallelRequestsPerActor": 100,
"authMode": "JWT",
"extensionConfigParams": {
"whitelistClasses": [ "java.*", "javax.*", "com.here.*", "jdk.internal.reflect.*", "com.sun.*", "org.w3c.dom.*", "sun.misc.*"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,7 @@

import com.here.naksha.app.service.AbstractNakshaHubVerticle;
import com.here.naksha.app.service.NakshaApp;
import com.here.naksha.app.service.http.apis.Api;
import com.here.naksha.app.service.http.apis.EventHandlerApi;
import com.here.naksha.app.service.http.apis.HealthApi;
import com.here.naksha.app.service.http.apis.ReadFeatureApi;
import com.here.naksha.app.service.http.apis.SpaceApi;
import com.here.naksha.app.service.http.apis.StorageApi;
import com.here.naksha.app.service.http.apis.WriteFeatureApi;
import com.here.naksha.app.service.http.apis.*;
import com.here.naksha.app.service.http.auth.JWTPayload;
import com.here.naksha.app.service.http.auth.NakshaJwtAuthHandler;
import com.here.naksha.app.service.util.logging.AccessLog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"maintenanceIntervalInMins": 720,
"maintenancePoolCoreSize": 5,
"maintenancePoolMaxSize": 20,
"maxParallelRequestsPerCPU": 30,
"maxPctParallelRequestsPerActor": 100,
"storageParams": {
"pg_hint_plan": true,
"pg_stat_statements": true
Expand Down
2 changes: 2 additions & 0 deletions here-naksha-app-service/src/main/resources/test-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"maintenanceIntervalInMins": 720,
"maintenancePoolCoreSize": 5,
"maintenancePoolMaxSize": 20,
"maxParallelRequestsPerCPU": 30,
"maxPctParallelRequestsPerActor": 100,
"storageParams": {
"pg_hint_plan": false,
"pg_stat_statements": false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import com.here.naksha.lib.core.exceptions.TooManyTasks;
import com.here.naksha.lib.core.util.NanoTime;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -53,12 +51,15 @@ public abstract class AbstractTask<RESULT, SELF extends AbstractTask<RESULT, SEL

private static final Logger log = LoggerFactory.getLogger(AbstractTask.class);

/**
* The soft-limit of tasks to run concurrently. This limit does normally not apply to child tasks.
*/
public static final AtomicLong limit =
new AtomicLong(Math.max(1000, Runtime.getRuntime().availableProcessors() * 50L));
private static IRequestLimitManager requestLimitManager = new DefaultRequestLimitManager();

private String actor;

public static void setConcurrencyLimitManager(IRequestLimitManager newRequestLimitManager) {
requestLimitManager = newRequestLimitManager;
}

private static final ConcurrentHashMap<@NotNull String, @NotNull Long> actorUsageMap = new ConcurrentHashMap<>();
private static final AtomicLong taskId = new AtomicLong(1L);
private static final ThreadGroup allTasksGroup = new ThreadGroup("Naksha-Tasks");
private static final ConcurrentHashMap<Long, NakshaWorker> allTasks = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -248,7 +249,7 @@ public void uncaughtException(@NotNull Thread thread, @NotNull Throwable t) {
private boolean internal;

/**
* Flag this task as internal, so when starting the task, the maximum amount of parallel tasks {@link #limit} is ignored.
* Flag this task as internal, so when starting the task, the maximum amount of parallel tasks limit is ignored.
*
* @param internal {@code true} if this task is internal and therefore bypassing the maximum parallel tasks limit.
* @throws IllegalStateException if the task is not in the state {@link State#NEW}.
Expand Down Expand Up @@ -393,33 +394,31 @@ protected final void unlock() {
* @throws RuntimeException If adding the task to the thread pool failed for an unknown error.
*/
public @NotNull Future<@NotNull RESULT> start() {
final long LIMIT = AbstractTask.limit.get();
final long LIMIT = requestLimitManager.getInstanceLevelLimit();
this.actor = context.getActor();
lockAndRequireNew();
try {
do {
final long threadCount = AbstractTask.threadCount.get();
assert threadCount >= 0L;
if (!internal && threadCount >= LIMIT) {
throw new TooManyTasks();
}
if (AbstractTask.threadCount.compareAndSet(threadCount, threadCount + 1)) {
try {
state.set(State.START);
final Future<RESULT> future = threadPool.submit(this::init_and_execute);
return future;
} catch (RejectedExecutionException e) {
throw new TooManyTasks();
} catch (Throwable t) {
AbstractTask.threadCount.decrementAndGet();
log.atError()
.setMessage("Unexpected exception while trying to fork a new thread")
.setCause(t)
.log();
throw new RuntimeException("Internal error while forking new worker thread", t);
}
}
// Conflict, two threads concurrently try to fork.
} while (true);
final long ACTOR_LIMIT = requestLimitManager.getActorLevelLimit(context);
incActorLevelUsage(this.actor, ACTOR_LIMIT);
incInstanceLevelUsage(this.actor, LIMIT);
try {
state.set(State.START);
final Future<RESULT> future = threadPool.submit(this::init_and_execute);
return future;
} catch (RejectedExecutionException e) {
String errorMessage = "Maximum number of concurrent tasks (" + LIMIT + ") reached";
decInstanceLevelUsage();
decActorLevelUsage(this.actor);
throw new TooManyTasks(errorMessage);
} catch (Throwable t) {
decInstanceLevelUsage();
decActorLevelUsage(this.actor);
log.atError()
.setMessage("Unexpected exception while trying to fork a new thread")
.setCause(t)
.log();
throw new RuntimeException("Internal error while forking new worker thread", t);
}
} finally {
unlock();
}
Expand Down Expand Up @@ -451,7 +450,8 @@ protected final void unlock() {
} finally {
try {
state.set(State.DONE);
final long newValue = AbstractTask.threadCount.decrementAndGet();
final long newValue = decInstanceLevelUsage();
decActorLevelUsage(this.actor);
assert newValue >= 0L;
detachFromCurrentThread();
} catch (Throwable t) {
Expand Down Expand Up @@ -581,4 +581,125 @@ public final boolean removeListener(@NotNull Consumer<@NotNull RESULT> listener)
state.set(State.NEW);
}
}

/**
* Increments the value of instance level usage and compares with the specified limit.
*
* <p>This method ensures that the number of concurrent tasks for the instance
* does not exceed the specified limit. If the limit is reached, it logs an
* error and throws a {@link TooManyTasks} exception.
*
* @param actorId The identifier of the actor for which to acquire the slot.
* @param limit The maximum number of concurrent tasks allowed for the instance.
* @throws TooManyTasks If the maximum number of concurrent tasks is reached for the instance.
*/
private void incInstanceLevelUsage(String actorId, long limit) {
while (true) {
final long threadCount = AbstractTask.threadCount.get();
assert threadCount >= 0L;
if (!internal && threadCount >= limit) {
log.info(
"NAKSHA_ERR_REQ_LIMIT_4_INSTANCE - [Request Limit breached for Instance => appId,author,actor,limit,crtValue] - ReqLimitForInstance {} {} {} {} {}",
context.getAppId(),
context.getAuthor(),
actorId,
limit,
threadCount);
String errorMessage = "Maximum number of concurrent tasks reached for instance (" + limit + ")";
decActorLevelUsage(actorId);
throw new TooManyTasks(errorMessage);
}
if (AbstractTask.threadCount.compareAndSet(threadCount, threadCount + 1)) {
break;
}
// Failed, conflict, repeat
}
}

private long decInstanceLevelUsage() {
return AbstractTask.threadCount.decrementAndGet();
}

/**
* Increments the value of author usage for given actor and compares with the specified limit.
*
* <p>This method ensures that the number of concurrent tasks for the actor
* does not exceed the specified limit. If the limit is reached, it logs an
* error and throws a {@link TooManyTasks} exception.
*
* @param actorId The identifier of the actor for which to acquire the slot.
* @param limit The maximum number of concurrent tasks allowed for the actor.
* @throws TooManyTasks If the maximum number of concurrent tasks is reached for the actor.
*/
private void incActorLevelUsage(String actorId, long limit) {
if (actorId == null) return;
if (limit <= 0) {
log.info(
"NAKSHA_ERR_REQ_LIMIT_4_ACTOR - [Request Limit breached for Actor => appId,author,actor,limit,crtValue] - ReqLimitForActor {} {} {} {} {}",
context.getAppId(),
context.getAuthor(),
actorId,
limit,
0);
String errorMessage = "Maximum number of concurrent tasks reached for actor (" + limit + ")";
throw new TooManyTasks(errorMessage);
}
while (true) {
Long counter = actorUsageMap.get(actorId);
if (counter == null) {
Long existing = actorUsageMap.putIfAbsent(actorId, 1L);
if (existing != null) {
continue; // Repeat, conflict with other thread
}
return;
}
// Increment counter
if (!internal && counter >= limit) {
log.info(
"NAKSHA_ERR_REQ_LIMIT_4_ACTOR - [Request Limit breached for Actor => appId,author,actor,limit,crtValue] - ReqLimitForActor {} {} {} {} {}",
context.getAppId(),
context.getAuthor(),
actorId,
limit,
counter);
String errorMessage = "Maximum number of concurrent tasks reached for actor (" + limit + ")";
throw new TooManyTasks(errorMessage);
}
if (actorUsageMap.replace(actorId, counter, counter + 1)) {
break;
}
// Failed, conflict, repeat
}
}

/**
* decrements the value of author usage given actor identifier.
*
* <p>This method decrements the usage count for the actor identifier. If the usage count
* becomes zero, it removes the actor identifier from the map. If another thread attempts
* to release the slot concurrently, it repeats the process until successful.
*
* @param actorId The identifier of the actor for which to release the slot.
*/
private void decActorLevelUsage(String actorId) {
if (actorId == null) return;
while (true) {
Long current = actorUsageMap.get(actorId);
if (current == null) {
log.error("Invalid actor usage value for actor: " + actorId + " value: null");
break;
} else if (current <= 1) {
if (current <= 0) {
log.error("Invalid actor usage value for actor: " + actorId + " value: " + current);
}
if (!actorUsageMap.remove(actorId, current)) {
continue;
}
break;
} else if (actorUsageMap.replace(actorId, current, current - 1)) {
break;
}
// Failed, repeat, conflict with other thread
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (C) 2017-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/
package com.here.naksha.lib.core;
/**
* The DefaultRequestLimitManager class is an implementation of the IRequestLimitManager interface
* providing default behavior for retrieving request limits.
*/
public class DefaultRequestLimitManager implements IRequestLimitManager {
private final long instanceLevelLimit;
private final double actorLimitPct;

/**
* Retrieves the number of available processors.
*
* @return The number of available processors.
*/
private static long getAvailableProcessors() {
return Runtime.getRuntime().availableProcessors();
}

/**
* Constructs a DefaultRequestLimitManager instance with default values.
* The instance-level limit is calculated based on the available processors.
* This function is useful where Hub is not involved
*/
public DefaultRequestLimitManager() {
this.instanceLevelLimit = 30L * getAvailableProcessors();
this.actorLimitPct = 25; // 25%
}

/**
* Constructs a DefaultRequestLimitManager instance with custom values.
*
* @param cpuLevelLimit The limit per CPU level.
* @param actorLimitPct The percentage of actor limit.
*/
public DefaultRequestLimitManager(int cpuLevelLimit, int actorLimitPct) {
this.instanceLevelLimit = cpuLevelLimit * getAvailableProcessors();
this.actorLimitPct = actorLimitPct;
}

/**
* Retrieves the instance-level request limit.
*
* @return The instance-level request limit.
*/
@Override
public long getInstanceLevelLimit() {
return instanceLevelLimit;
}

/**
* Retrieves the request limit for a specific actor within the given context.
* The actor-level limit is calculated as a percentage of the instance-level limit.
*
* @param context The NakshaContext representing the context in which the actor operates.
* @return The request limit for the actor within the given context.
*/
@Override
public long getActorLevelLimit(NakshaContext context) {
return (long) ((instanceLevelLimit * actorLimitPct) / 100);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (C) 2017-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/
package com.here.naksha.lib.core;

/**
* The IRequestLimitManager interface defines methods for retrieving request limits
* at different levels - instance level and actor level.
*/
public interface IRequestLimitManager {
/**
* Retrieves the instance-level request limit.
*
* @return The instance-level request limit.
*/
long getInstanceLevelLimit();
/**
* Retrieves the request limit for a specific actor within the given context.
*
* @param context The NakshaContext representing the context in which the actor operates.
* @return The request limit for the actor within the given context.
*/
long getActorLevelLimit(NakshaContext context);
}
Loading

0 comments on commit 5670771

Please sign in to comment.