diff --git a/deployment/codedeploy/contents/naksha-hub/.config/cloud-config.json b/deployment/codedeploy/contents/naksha-hub/.config/cloud-config.json index b40963124..81a83670e 100644 --- a/deployment/codedeploy/contents/naksha-hub/.config/cloud-config.json +++ b/deployment/codedeploy/contents/naksha-hub/.config/cloud-config.json @@ -3,6 +3,8 @@ "type": "Config", "httpPort": 7080, "requestBodyLimit": 25, + "maxParallelRequestsPerCPU": 30, + "maxPctParallelRequestsPerActor": 100, "authMode": "JWT", "extensionConfigParams": { "whitelistClasses": [ "java.*", "javax.*", "com.here.*", "jdk.internal.reflect.*", "com.sun.*", "org.w3c.dom.*", "sun.misc.*"], diff --git a/here-naksha-app-service/src/main/java/com/here/naksha/app/service/http/NakshaHttpVerticle.java b/here-naksha-app-service/src/main/java/com/here/naksha/app/service/http/NakshaHttpVerticle.java index 00bcb18ac..60ec81e60 100644 --- a/here-naksha-app-service/src/main/java/com/here/naksha/app/service/http/NakshaHttpVerticle.java +++ b/here-naksha-app-service/src/main/java/com/here/naksha/app/service/http/NakshaHttpVerticle.java @@ -43,13 +43,7 @@ import com.here.naksha.app.service.AbstractNakshaHubVerticle; import com.here.naksha.app.service.NakshaApp; -import com.here.naksha.app.service.http.apis.Api; -import com.here.naksha.app.service.http.apis.EventHandlerApi; -import com.here.naksha.app.service.http.apis.HealthApi; -import com.here.naksha.app.service.http.apis.ReadFeatureApi; -import com.here.naksha.app.service.http.apis.SpaceApi; -import com.here.naksha.app.service.http.apis.StorageApi; -import com.here.naksha.app.service.http.apis.WriteFeatureApi; +import com.here.naksha.app.service.http.apis.*; import com.here.naksha.app.service.http.auth.JWTPayload; import com.here.naksha.app.service.http.auth.NakshaJwtAuthHandler; import com.here.naksha.app.service.util.logging.AccessLog; diff --git a/here-naksha-app-service/src/main/resources/test-config-with-extensions.json b/here-naksha-app-service/src/main/resources/test-config-with-extensions.json index bdfbde7ec..5be333f36 100644 --- a/here-naksha-app-service/src/main/resources/test-config-with-extensions.json +++ b/here-naksha-app-service/src/main/resources/test-config-with-extensions.json @@ -11,6 +11,8 @@ "maintenanceIntervalInMins": 720, "maintenancePoolCoreSize": 5, "maintenancePoolMaxSize": 20, + "maxParallelRequestsPerCPU": 30, + "maxPctParallelRequestsPerActor": 100, "storageParams": { "pg_hint_plan": true, "pg_stat_statements": true diff --git a/here-naksha-app-service/src/main/resources/test-config.json b/here-naksha-app-service/src/main/resources/test-config.json index cd72c7a8a..c5d31cad8 100644 --- a/here-naksha-app-service/src/main/resources/test-config.json +++ b/here-naksha-app-service/src/main/resources/test-config.json @@ -11,6 +11,8 @@ "maintenanceIntervalInMins": 720, "maintenancePoolCoreSize": 5, "maintenancePoolMaxSize": 20, + "maxParallelRequestsPerCPU": 30, + "maxPctParallelRequestsPerActor": 100, "storageParams": { "pg_hint_plan": false, "pg_stat_statements": false diff --git a/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/AbstractTask.java b/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/AbstractTask.java index 772d8e0aa..c30b924c1 100644 --- a/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/AbstractTask.java +++ b/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/AbstractTask.java @@ -21,9 +21,7 @@ import com.here.naksha.lib.core.exceptions.TooManyTasks; import com.here.naksha.lib.core.util.NanoTime; import java.lang.Thread.UncaughtExceptionHandler; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.List; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -53,12 +51,15 @@ public abstract class AbstractTask actorUsageMap = new ConcurrentHashMap<>(); private static final AtomicLong taskId = new AtomicLong(1L); private static final ThreadGroup allTasksGroup = new ThreadGroup("Naksha-Tasks"); private static final ConcurrentHashMap allTasks = new ConcurrentHashMap<>(); @@ -248,7 +249,7 @@ public void uncaughtException(@NotNull Thread thread, @NotNull Throwable t) { private boolean internal; /** - * Flag this task as internal, so when starting the task, the maximum amount of parallel tasks {@link #limit} is ignored. + * Flag this task as internal, so when starting the task, the maximum amount of parallel tasks limit is ignored. * * @param internal {@code true} if this task is internal and therefore bypassing the maximum parallel tasks limit. * @throws IllegalStateException if the task is not in the state {@link State#NEW}. @@ -393,33 +394,31 @@ protected final void unlock() { * @throws RuntimeException If adding the task to the thread pool failed for an unknown error. */ public @NotNull Future<@NotNull RESULT> start() { - final long LIMIT = AbstractTask.limit.get(); + final long LIMIT = requestLimitManager.getInstanceLevelLimit(); + this.actor = context.getActor(); lockAndRequireNew(); try { - do { - final long threadCount = AbstractTask.threadCount.get(); - assert threadCount >= 0L; - if (!internal && threadCount >= LIMIT) { - throw new TooManyTasks(); - } - if (AbstractTask.threadCount.compareAndSet(threadCount, threadCount + 1)) { - try { - state.set(State.START); - final Future future = threadPool.submit(this::init_and_execute); - return future; - } catch (RejectedExecutionException e) { - throw new TooManyTasks(); - } catch (Throwable t) { - AbstractTask.threadCount.decrementAndGet(); - log.atError() - .setMessage("Unexpected exception while trying to fork a new thread") - .setCause(t) - .log(); - throw new RuntimeException("Internal error while forking new worker thread", t); - } - } - // Conflict, two threads concurrently try to fork. - } while (true); + final long ACTOR_LIMIT = requestLimitManager.getActorLevelLimit(context); + incActorLevelUsage(this.actor, ACTOR_LIMIT); + incInstanceLevelUsage(this.actor, LIMIT); + try { + state.set(State.START); + final Future future = threadPool.submit(this::init_and_execute); + return future; + } catch (RejectedExecutionException e) { + String errorMessage = "Maximum number of concurrent tasks (" + LIMIT + ") reached"; + decInstanceLevelUsage(); + decActorLevelUsage(this.actor); + throw new TooManyTasks(errorMessage); + } catch (Throwable t) { + decInstanceLevelUsage(); + decActorLevelUsage(this.actor); + log.atError() + .setMessage("Unexpected exception while trying to fork a new thread") + .setCause(t) + .log(); + throw new RuntimeException("Internal error while forking new worker thread", t); + } } finally { unlock(); } @@ -451,7 +450,8 @@ protected final void unlock() { } finally { try { state.set(State.DONE); - final long newValue = AbstractTask.threadCount.decrementAndGet(); + final long newValue = decInstanceLevelUsage(); + decActorLevelUsage(this.actor); assert newValue >= 0L; detachFromCurrentThread(); } catch (Throwable t) { @@ -581,4 +581,125 @@ public final boolean removeListener(@NotNull Consumer<@NotNull RESULT> listener) state.set(State.NEW); } } + + /** + * Increments the value of instance level usage and compares with the specified limit. + * + *

This method ensures that the number of concurrent tasks for the instance + * does not exceed the specified limit. If the limit is reached, it logs an + * error and throws a {@link TooManyTasks} exception. + * + * @param actorId The identifier of the actor for which to acquire the slot. + * @param limit The maximum number of concurrent tasks allowed for the instance. + * @throws TooManyTasks If the maximum number of concurrent tasks is reached for the instance. + */ + private void incInstanceLevelUsage(String actorId, long limit) { + while (true) { + final long threadCount = AbstractTask.threadCount.get(); + assert threadCount >= 0L; + if (!internal && threadCount >= limit) { + log.info( + "NAKSHA_ERR_REQ_LIMIT_4_INSTANCE - [Request Limit breached for Instance => appId,author,actor,limit,crtValue] - ReqLimitForInstance {} {} {} {} {}", + context.getAppId(), + context.getAuthor(), + actorId, + limit, + threadCount); + String errorMessage = "Maximum number of concurrent tasks reached for instance (" + limit + ")"; + decActorLevelUsage(actorId); + throw new TooManyTasks(errorMessage); + } + if (AbstractTask.threadCount.compareAndSet(threadCount, threadCount + 1)) { + break; + } + // Failed, conflict, repeat + } + } + + private long decInstanceLevelUsage() { + return AbstractTask.threadCount.decrementAndGet(); + } + + /** + * Increments the value of author usage for given actor and compares with the specified limit. + * + *

This method ensures that the number of concurrent tasks for the actor + * does not exceed the specified limit. If the limit is reached, it logs an + * error and throws a {@link TooManyTasks} exception. + * + * @param actorId The identifier of the actor for which to acquire the slot. + * @param limit The maximum number of concurrent tasks allowed for the actor. + * @throws TooManyTasks If the maximum number of concurrent tasks is reached for the actor. + */ + private void incActorLevelUsage(String actorId, long limit) { + if (actorId == null) return; + if (limit <= 0) { + log.info( + "NAKSHA_ERR_REQ_LIMIT_4_ACTOR - [Request Limit breached for Actor => appId,author,actor,limit,crtValue] - ReqLimitForActor {} {} {} {} {}", + context.getAppId(), + context.getAuthor(), + actorId, + limit, + 0); + String errorMessage = "Maximum number of concurrent tasks reached for actor (" + limit + ")"; + throw new TooManyTasks(errorMessage); + } + while (true) { + Long counter = actorUsageMap.get(actorId); + if (counter == null) { + Long existing = actorUsageMap.putIfAbsent(actorId, 1L); + if (existing != null) { + continue; // Repeat, conflict with other thread + } + return; + } + // Increment counter + if (!internal && counter >= limit) { + log.info( + "NAKSHA_ERR_REQ_LIMIT_4_ACTOR - [Request Limit breached for Actor => appId,author,actor,limit,crtValue] - ReqLimitForActor {} {} {} {} {}", + context.getAppId(), + context.getAuthor(), + actorId, + limit, + counter); + String errorMessage = "Maximum number of concurrent tasks reached for actor (" + limit + ")"; + throw new TooManyTasks(errorMessage); + } + if (actorUsageMap.replace(actorId, counter, counter + 1)) { + break; + } + // Failed, conflict, repeat + } + } + + /** + * decrements the value of author usage given actor identifier. + * + *

This method decrements the usage count for the actor identifier. If the usage count + * becomes zero, it removes the actor identifier from the map. If another thread attempts + * to release the slot concurrently, it repeats the process until successful. + * + * @param actorId The identifier of the actor for which to release the slot. + */ + private void decActorLevelUsage(String actorId) { + if (actorId == null) return; + while (true) { + Long current = actorUsageMap.get(actorId); + if (current == null) { + log.error("Invalid actor usage value for actor: " + actorId + " value: null"); + break; + } else if (current <= 1) { + if (current <= 0) { + log.error("Invalid actor usage value for actor: " + actorId + " value: " + current); + } + if (!actorUsageMap.remove(actorId, current)) { + continue; + } + break; + } else if (actorUsageMap.replace(actorId, current, current - 1)) { + break; + } + // Failed, repeat, conflict with other thread + } + } } diff --git a/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/DefaultRequestLimitManager.java b/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/DefaultRequestLimitManager.java new file mode 100644 index 000000000..3c65e0eb4 --- /dev/null +++ b/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/DefaultRequestLimitManager.java @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2017-2023 HERE Europe B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * License-Filename: LICENSE + */ +package com.here.naksha.lib.core; +/** + * The DefaultRequestLimitManager class is an implementation of the IRequestLimitManager interface + * providing default behavior for retrieving request limits. + */ +public class DefaultRequestLimitManager implements IRequestLimitManager { + private final long instanceLevelLimit; + private final double actorLimitPct; + + /** + * Retrieves the number of available processors. + * + * @return The number of available processors. + */ + private static long getAvailableProcessors() { + return Runtime.getRuntime().availableProcessors(); + } + + /** + * Constructs a DefaultRequestLimitManager instance with default values. + * The instance-level limit is calculated based on the available processors. + * This function is useful where Hub is not involved + */ + public DefaultRequestLimitManager() { + this.instanceLevelLimit = 30L * getAvailableProcessors(); + this.actorLimitPct = 25; // 25% + } + + /** + * Constructs a DefaultRequestLimitManager instance with custom values. + * + * @param cpuLevelLimit The limit per CPU level. + * @param actorLimitPct The percentage of actor limit. + */ + public DefaultRequestLimitManager(int cpuLevelLimit, int actorLimitPct) { + this.instanceLevelLimit = cpuLevelLimit * getAvailableProcessors(); + this.actorLimitPct = actorLimitPct; + } + + /** + * Retrieves the instance-level request limit. + * + * @return The instance-level request limit. + */ + @Override + public long getInstanceLevelLimit() { + return instanceLevelLimit; + } + + /** + * Retrieves the request limit for a specific actor within the given context. + * The actor-level limit is calculated as a percentage of the instance-level limit. + * + * @param context The NakshaContext representing the context in which the actor operates. + * @return The request limit for the actor within the given context. + */ + @Override + public long getActorLevelLimit(NakshaContext context) { + return (long) ((instanceLevelLimit * actorLimitPct) / 100); + } +} diff --git a/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/IRequestLimitManager.java b/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/IRequestLimitManager.java new file mode 100644 index 000000000..12321d1b1 --- /dev/null +++ b/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/IRequestLimitManager.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2017-2023 HERE Europe B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * License-Filename: LICENSE + */ +package com.here.naksha.lib.core; + +/** + * The IRequestLimitManager interface defines methods for retrieving request limits + * at different levels - instance level and actor level. + */ +public interface IRequestLimitManager { + /** + * Retrieves the instance-level request limit. + * + * @return The instance-level request limit. + */ + long getInstanceLevelLimit(); + /** + * Retrieves the request limit for a specific actor within the given context. + * + * @param context The NakshaContext representing the context in which the actor operates. + * @return The request limit for the actor within the given context. + */ + long getActorLevelLimit(NakshaContext context); +} diff --git a/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/NakshaContext.java b/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/NakshaContext.java index 184fb54a3..cabacb9d5 100644 --- a/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/NakshaContext.java +++ b/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/NakshaContext.java @@ -324,6 +324,14 @@ public void attachStreamInfo(final @Nullable StreamInfo streamInfo) { return appId; } + /** + * Returns the value of author. If author is null then it returns the appId. + */ + @AvailableSince(NakshaVersion.v2_0_15) + public String getActor() { + return author != null ? author : app_id; + } + /** * Returns the raw application identifier. * diff --git a/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/exceptions/TooManyTasks.java b/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/exceptions/TooManyTasks.java index 9068fd9f2..46eaf85bd 100644 --- a/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/exceptions/TooManyTasks.java +++ b/here-naksha-lib-core/src/main/java/com/here/naksha/lib/core/exceptions/TooManyTasks.java @@ -26,7 +26,7 @@ */ public class TooManyTasks extends RuntimeException { - public TooManyTasks() { - super("Maximum number of concurrent tasks (" + AbstractTask.limit.get() + ") reached"); + public TooManyTasks(String errorMessage) { + super(errorMessage); } } diff --git a/here-naksha-lib-hub/src/main/java/com/here/naksha/lib/hub/NakshaHub.java b/here-naksha-lib-hub/src/main/java/com/here/naksha/lib/hub/NakshaHub.java index 497eff8a5..305b5cf62 100644 --- a/here-naksha-lib-hub/src/main/java/com/here/naksha/lib/hub/NakshaHub.java +++ b/here-naksha-lib-hub/src/main/java/com/here/naksha/lib/hub/NakshaHub.java @@ -27,10 +27,7 @@ import static com.here.naksha.lib.core.util.storage.ResultHelper.readFeatureFromResult; import com.fasterxml.jackson.databind.ObjectMapper; -import com.here.naksha.lib.core.INaksha; -import com.here.naksha.lib.core.NakshaAdminCollection; -import com.here.naksha.lib.core.NakshaContext; -import com.here.naksha.lib.core.NakshaVersion; +import com.here.naksha.lib.core.*; import com.here.naksha.lib.core.exceptions.NoCursor; import com.here.naksha.lib.core.exceptions.StorageNotFoundException; import com.here.naksha.lib.core.lambdas.Fe1; @@ -123,6 +120,13 @@ public NakshaHub( } else { logger.warn("ExtensionManager is not initialised due to extensionConfigParams not found."); } + // Setting Concurrency Thresholds + logger.info("Value of maxParallelRequestsPerCPU is {}", nakshaHubConfig.maxParallelRequestsPerCPU); + logger.info("Value of maxPctParallelRequestsPerActor is {}", nakshaHubConfig.maxPctParallelRequestsPerActor); + IRequestLimitManager requestLimitManager = new DefaultRequestLimitManager( + nakshaHubConfig.maxParallelRequestsPerCPU, nakshaHubConfig.maxPctParallelRequestsPerActor); + AbstractTask.setConcurrencyLimitManager(requestLimitManager); + logger.info("NakshaHub initialization done!"); } diff --git a/here-naksha-lib-hub/src/main/java/com/here/naksha/lib/hub/NakshaHubConfig.java b/here-naksha-lib-hub/src/main/java/com/here/naksha/lib/hub/NakshaHubConfig.java index d5a33684b..a1366d356 100644 --- a/here-naksha-lib-hub/src/main/java/com/here/naksha/lib/hub/NakshaHubConfig.java +++ b/here-naksha-lib-hub/src/main/java/com/here/naksha/lib/hub/NakshaHubConfig.java @@ -103,7 +103,9 @@ public final class NakshaHubConfig extends XyzFeature implements JsonSerializabl @JsonProperty("maintenancePoolMaxSize") @Nullable Integer maintenancePoolMaxSize, @JsonProperty("storageParams") @Nullable Map storageParams, @JsonProperty("extensionConfigParams") @Nullable ExtensionConfigParams extensionConfigParams, - @JsonProperty("requestBodyLimit") @Nullable Integer requestBodyLimit) { + @JsonProperty("requestBodyLimit") @Nullable Integer requestBodyLimit, + @JsonProperty("maxParallelRequestsPerCPU") @Nullable Integer maxParallelRequestsPerCPU, + @JsonProperty("maxPctParallelRequestsPerActor") @Nullable Integer maxPctParallelRequestsPerActor) { super(id); if (httpPort != null && (httpPort < 0 || httpPort > 65535)) { logger.atError() @@ -193,6 +195,11 @@ public final class NakshaHubConfig extends XyzFeature implements JsonSerializabl } else { this.requestBodyLimit = requestBodyLimit; } + this.maxParallelRequestsPerCPU = + maxParallelRequestsPerCPU != null ? maxParallelRequestsPerCPU : defaultMaxParallelRequestsPerCPU(); + this.maxPctParallelRequestsPerActor = maxPctParallelRequestsPerActor != null + ? maxPctParallelRequestsPerActor + : defaultMaxPctParallelRequestsPerActor(); } private String getEnv() { @@ -349,6 +356,23 @@ public static int defaultMaintenancePoolMaxSize() { return 5; } + /** + * Returns a default threshold per processor for concurrency + * + * @return the default threshold per processor + */ + public static int defaultMaxParallelRequestsPerCPU() { + return 30; + } + + /** + * Returns a default percentage threshold per principal for concurrency + * + * @return the default percentage threshold per principal + */ + public static int defaultMaxPctParallelRequestsPerActor() { + return 25; + } /** * Optional storage-specific parameters */ @@ -363,6 +387,16 @@ public static int defaultMaintenancePoolMaxSize() { */ public final Integer requestBodyLimit; + /** + * Optional Total Concurrency Limit + */ + public final Integer maxParallelRequestsPerCPU; + + /** + * Optional Total Author Concurrency Threshold + */ + public final Integer maxPctParallelRequestsPerActor; + public static final String NAKSHA_AUTH = "authMode"; /** diff --git a/here-naksha-lib-hub/src/main/resources/config/default-config.json b/here-naksha-lib-hub/src/main/resources/config/default-config.json index 02c048eec..663e1dfc6 100644 --- a/here-naksha-lib-hub/src/main/resources/config/default-config.json +++ b/here-naksha-lib-hub/src/main/resources/config/default-config.json @@ -11,5 +11,7 @@ "maintenanceIntervalInMins": 720, "maintenancePoolCoreSize": 5, "maintenancePoolMaxSize": 20, + "maxParallelRequestsPerCPU": 30, + "maxPctParallelRequestsPerActor": 100, "extensionConfigParams": null } \ No newline at end of file diff --git a/here-naksha-lib-view/src/main/java/com/here/naksha/lib/view/concurrent/ParallelQueryExecutor.java b/here-naksha-lib-view/src/main/java/com/here/naksha/lib/view/concurrent/ParallelQueryExecutor.java index 1cf7f04d2..b9b3e5e82 100644 --- a/here-naksha-lib-view/src/main/java/com/here/naksha/lib/view/concurrent/ParallelQueryExecutor.java +++ b/here-naksha-lib-view/src/main/java/com/here/naksha/lib/view/concurrent/ParallelQueryExecutor.java @@ -22,6 +22,7 @@ import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toList; +import com.here.naksha.lib.core.NakshaContext; import com.here.naksha.lib.core.exceptions.NoCursor; import com.here.naksha.lib.core.models.storage.FeatureCodec; import com.here.naksha.lib.core.models.storage.FeatureCodecFactory; @@ -57,7 +58,8 @@ Map>> queryInParallel( List>>> futures = new ArrayList<>(); for (LayerReadRequest layerReadRequest : requests) { - QueryTask>> singleTask = new QueryTask<>(); + QueryTask>> singleTask = + new QueryTask<>(null, NakshaContext.currentContext()); Future>> futureResult = singleTask.start(() -> executeSingle( layerReadRequest.getViewLayer(), diff --git a/here-naksha-lib-view/src/main/java/com/here/naksha/lib/view/concurrent/QueryTask.java b/here-naksha-lib-view/src/main/java/com/here/naksha/lib/view/concurrent/QueryTask.java index 88abd4e0e..8732ec251 100644 --- a/here-naksha-lib-view/src/main/java/com/here/naksha/lib/view/concurrent/QueryTask.java +++ b/here-naksha-lib-view/src/main/java/com/here/naksha/lib/view/concurrent/QueryTask.java @@ -20,10 +20,15 @@ import static com.here.naksha.lib.core.exceptions.UncheckedException.unchecked; +import com.here.naksha.lib.core.INaksha; +import com.here.naksha.lib.core.NakshaContext; import com.here.naksha.lib.core.SimpleTask; import org.jetbrains.annotations.NotNull; class QueryTask extends SimpleTask { + public QueryTask(INaksha naksha, NakshaContext context) { + super(naksha, context); + } @Override protected @NotNull RESULT errorResponse(@NotNull Throwable throwable) { diff --git a/here-naksha-lib-view/src/test/java/com/here/naksha/lib/view/ViewTest.java b/here-naksha-lib-view/src/test/java/com/here/naksha/lib/view/ViewTest.java index 16ca42e85..4395cae7f 100644 --- a/here-naksha-lib-view/src/test/java/com/here/naksha/lib/view/ViewTest.java +++ b/here-naksha-lib-view/src/test/java/com/here/naksha/lib/view/ViewTest.java @@ -18,9 +18,7 @@ */ package com.here.naksha.lib.view; -import com.here.naksha.lib.core.AbstractTask; -import com.here.naksha.lib.core.NakshaContext; -import com.here.naksha.lib.core.SimpleTask; +import com.here.naksha.lib.core.*; import com.here.naksha.lib.core.exceptions.NoCursor; import com.here.naksha.lib.core.exceptions.TooManyTasks; import com.here.naksha.lib.core.exceptions.UncheckedException; @@ -239,8 +237,10 @@ void testTimeoutExceptionInOneOfTheThreads() { @Test void shouldThrowTooManyTasksException() { IStorage mockStorage = mock(IStorage.class); - int limit = AbstractTask.limit.intValue(); - ViewLayer[] layerDS = new ViewLayer[limit + 10]; + IRequestLimitManager requestLimitManager= new DefaultRequestLimitManager(30,100); + AbstractTask.setConcurrencyLimitManager(requestLimitManager); + long limit = requestLimitManager.getInstanceLevelLimit(); + ViewLayer[] layerDS = new ViewLayer[(int) (limit + 10)]; //Create ThreadFactory Limit + 10 layers for (int ind = 0; ind < layerDS.length; ind++) { layerDS[ind] = new ViewLayer(mockStorage, "collection" + ind); @@ -255,7 +255,7 @@ void shouldThrowTooManyTasksException() { public Object answer(InvocationOnMock invocation) throws Throwable { List requests = invocation.getArgument(0); for (LayerReadRequest layerReadRequest : requests) { - SimpleTask singleTask = new SimpleTask<>(); + SimpleTask singleTask = new SimpleTask<>(null,nc); tasks.add(singleTask); singleTask.start(() -> { Thread.sleep(1000);