From f7e0dad9cb7919e6289f55c643252cbe78d96fa3 Mon Sep 17 00:00:00 2001 From: Tomas Langer Date: Fri, 27 Sep 2024 15:24:09 +0200 Subject: [PATCH 01/10] Concurrency limits module, and support in Helidon WebServer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tomas Langer Co-authored-by: André Rouél Signed-off-by: Tomas Langer --- all/pom.xml | 4 + bom/pom.xml | 5 + common/concurrency/limits/README.md | 14 ++ common/concurrency/limits/pom.xml | 117 ++++++++++ .../common/concurrency/limits/AimdLimit.java | 129 ++++++++++ .../limits/AimdLimitConfigBlueprint.java | 97 ++++++++ .../concurrency/limits/AimdLimitImpl.java | 159 +++++++++++++ .../concurrency/limits/AimdLimitProvider.java | 47 ++++ .../common/concurrency/limits/BasicLimit.java | 221 ++++++++++++++++++ .../limits/BasicLimitConfigBlueprint.java | 68 ++++++ .../limits/BasicLimitProvider.java | 47 ++++ .../limits/IgnoreTaskException.java | 77 ++++++ .../common/concurrency/limits/Limit.java | 58 +++++ .../concurrency/limits/LimitException.java | 45 ++++ .../concurrency/limits}/NoopSemaphore.java | 20 +- .../concurrency/limits/SemaphoreLimit.java | 39 ++++ .../concurrency/limits/package-info.java | 23 ++ .../concurrency/limits/spi/LimitProvider.java | 28 +++ .../concurrency/limits/spi/package-info.java | 20 ++ .../limits/src/main/java/module-info.java | 35 +++ .../resources/META-INF/helidon/service.loader | 2 + .../concurrency/limits/AimdLimitTest.java | 168 +++++++++++++ .../concurrency/limits/BasicLimitTest.java | 129 ++++++++++ common/concurrency/pom.xml | 39 ++++ common/pom.xml | 1 + fault-tolerance/fault-tolerance/pom.xml | 4 + .../faulttolerance/BulkheadLimitProvider.java | 116 +++++++++ .../src/main/java/module-info.java | 4 + microprofile/websocket/pom.xml | 4 + .../microprofile/tyrus/TyrusConnection.java | 60 +++-- .../websocket/src/main/java/module-info.java | 3 +- webserver/concurrency-limits/pom.xml | 134 +++++++++++ .../concurrency/limits/LimitsFeature.java | 147 ++++++++++++ .../limits/LimitsFeatureConfigBlueprint.java | 76 ++++++ .../limits/LimitsFeatureProvider.java | 58 +++++ .../limits/LimitsRoutingFeature.java | 67 ++++++ .../concurrency/limits/package-info.java | 20 ++ .../src/main/java/module-info.java | 35 +++ .../concurrency/limits/BulkheadTest.java | 87 +++++++ .../src/test/resources/application.yaml | 23 ++ .../src/test/resources/logging.properties | 21 ++ webserver/http2/pom.xml | 4 + .../webserver/http2/Http2Connection.java | 27 ++- .../webserver/http2/Http2ServerStream.java | 25 +- .../http2/src/main/java/module-info.java | 1 + webserver/pom.xml | 1 + webserver/webserver/pom.xml | 4 + .../helidon/webserver/ConnectionHandler.java | 9 +- .../webserver/ListenerConfigBlueprint.java | 17 ++ .../io/helidon/webserver/ServerListener.java | 22 +- .../webserver/http1/Http1Connection.java | 24 +- .../webserver/spi/ServerConnection.java | 27 ++- .../webserver/src/main/java/module-info.java | 3 +- webserver/websocket/pom.xml | 4 + .../webserver/websocket/WsConnection.java | 63 +++-- .../websocket/src/main/java/module-info.java | 1 + 56 files changed, 2588 insertions(+), 95 deletions(-) create mode 100644 common/concurrency/limits/README.md create mode 100644 common/concurrency/limits/pom.xml create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitProvider.java create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/BasicLimit.java create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/BasicLimitConfigBlueprint.java create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/BasicLimitProvider.java create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/IgnoreTaskException.java create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/Limit.java create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitException.java rename {webserver/webserver/src/main/java/io/helidon/webserver => common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits}/NoopSemaphore.java (79%) create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/SemaphoreLimit.java create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/package-info.java create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/spi/LimitProvider.java create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/spi/package-info.java create mode 100644 common/concurrency/limits/src/main/java/module-info.java create mode 100644 common/concurrency/limits/src/main/resources/META-INF/helidon/service.loader create mode 100644 common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/AimdLimitTest.java create mode 100644 common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/BasicLimitTest.java create mode 100644 common/concurrency/pom.xml create mode 100644 fault-tolerance/fault-tolerance/src/main/java/io/helidon/faulttolerance/BulkheadLimitProvider.java create mode 100644 webserver/concurrency-limits/pom.xml create mode 100644 webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeature.java create mode 100644 webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeatureConfigBlueprint.java create mode 100644 webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeatureProvider.java create mode 100644 webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsRoutingFeature.java create mode 100644 webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/package-info.java create mode 100644 webserver/concurrency-limits/src/main/java/module-info.java create mode 100644 webserver/concurrency-limits/src/test/java/io/helidon/webserver/concurrency/limits/BulkheadTest.java create mode 100644 webserver/concurrency-limits/src/test/resources/application.yaml create mode 100644 webserver/concurrency-limits/src/test/resources/logging.properties diff --git a/all/pom.xml b/all/pom.xml index 28f689ada92..e8c28bafd27 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -463,6 +463,10 @@ io.helidon.common.features helidon-common-features + + io.helidon.common.concurrency + helidon-common-concurrency-limits + io.helidon.dbclient helidon-dbclient diff --git a/bom/pom.xml b/bom/pom.xml index 73de6d8cff9..273f33d063e 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -610,6 +610,11 @@ helidon-common-features ${helidon.version} + + io.helidon.common.concurrency + helidon-common-concurrency-limits + ${helidon.version} + diff --git a/common/concurrency/limits/README.md b/common/concurrency/limits/README.md new file mode 100644 index 00000000000..70d3cd37132 --- /dev/null +++ b/common/concurrency/limits/README.md @@ -0,0 +1,14 @@ +Concurrency Limits +----- + +This module provides concurrency limits, so we can limit the number of concurrent, in-progress operations (for example in WebServer). + +The implemented concurrency limits are: + +| Key | Weight | Description | +|-------------|--------|----------------------------------------------------------------------------------------------------------------------| +| `semaphore` | `90` | Semaphore based concurrency limit (highest weight, so the default), if max set to `0`, we have unlimited concurrency | +| `aimd` | `80` | AIMD based limit (additive-increase/multiplicative-decrease) | +| `bulkhead` | `70` | Uses a bulkhead with a queue, implementation provided by `helidon-fault-tolerance` | + +Current usage: `helidon-webserver` \ No newline at end of file diff --git a/common/concurrency/limits/pom.xml b/common/concurrency/limits/pom.xml new file mode 100644 index 00000000000..2e2268bc4ae --- /dev/null +++ b/common/concurrency/limits/pom.xml @@ -0,0 +1,117 @@ + + + + + 4.0.0 + + io.helidon.common.concurrency + helidon-common-concurrency-project + 4.1.0-SNAPSHOT + ../pom.xml + + + helidon-common-concurrency-limits + Helidon Common Concurrency Limits + + + + io.helidon.common + helidon-common + + + io.helidon.builder + helidon-builder-api + + + io.helidon.common + helidon-common-config + + + io.helidon.service + helidon-service-registry + true + + + org.hamcrest + hamcrest-all + test + + + org.junit.jupiter + junit-jupiter-api + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + io.helidon.codegen + helidon-codegen-apt + ${helidon.version} + + + io.helidon.builder + helidon-builder-codegen + ${helidon.version} + + + io.helidon.config.metadata + helidon-config-metadata-codegen + ${helidon.version} + + + io.helidon.codegen + helidon-codegen-helidon-copyright + ${helidon.version} + + + + + + io.helidon.codegen + helidon-codegen-apt + ${helidon.version} + + + io.helidon.builder + helidon-builder-codegen + ${helidon.version} + + + io.helidon.config.metadata + helidon-config-metadata-codegen + ${helidon.version} + + + io.helidon.codegen + helidon-codegen-helidon-copyright + ${helidon.version} + + + + + + diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java new file mode 100644 index 00000000000..b19f7bfaabf --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; +import java.util.function.Consumer; + +import io.helidon.builder.api.RuntimeType; +import io.helidon.common.config.Config; + +/** + * AIMD based limiter. + *

+ * The additive-increase/multiplicative-decrease (AIMD) algorithm is a feedback control algorithm best known for its use in TCP + * congestion control. AIMD combines linear growth of the congestion window when there is no congestion with an exponential + * reduction when congestion is detected. + */ +@SuppressWarnings("removal") +@RuntimeType.PrototypedBy(AimdLimitConfig.class) +public class AimdLimit implements Limit, SemaphoreLimit, RuntimeType.Api { + static final String TYPE = "aimd"; + + private final AimdLimitConfig config; + private final AimdLimitImpl aimdLimitImpl; + + private AimdLimit(AimdLimitConfig config) { + this.config = config; + this.aimdLimitImpl = new AimdLimitImpl(config); + } + + /** + * Create a new fluent API builder to construct {@link io.helidon.common.concurrency.limits.AimdLimit} + * instance. + * + * @return fluent API builder + */ + public static AimdLimitConfig.Builder builder() { + return AimdLimitConfig.builder(); + } + + /** + * Create a new instance with all defaults. + * + * @return a new limit instance + */ + public static AimdLimit create() { + return builder().build(); + } + + /** + * Create a new instance from configuration. + * + * @param config configuration of the AIMD limit + * @return a new limit instance configured from {@code config} + */ + public static AimdLimit create(Config config) { + return builder() + .config(config) + .build(); + } + + /** + * Create a new instance from configuration. + * + * @param config configuration of the AIMD limit + * @return a new limit instance configured from {@code config} + */ + public static AimdLimit create(AimdLimitConfig config) { + return new AimdLimit(config); + } + + /** + * Create a new instance customizing its configuration. + * + * @param consumer consumer of configuration builder + * @return a new limit instance configured from the builder + */ + public static AimdLimit create(Consumer consumer) { + return builder() + .update(consumer) + .build(); + } + + @Override + public T invoke(Callable callable) throws Exception { + return aimdLimitImpl.invoke(callable); + } + + @Override + public void invoke(Runnable runnable) throws Exception { + aimdLimitImpl.invoke(runnable); + } + + @SuppressWarnings("removal") + @Override + public Semaphore semaphore() { + return aimdLimitImpl.semaphore(); + } + + @Override + public String name() { + return config.name(); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public AimdLimitConfig prototype() { + return config; + } +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java new file mode 100644 index 00000000000..400fbb99682 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.time.Duration; +import java.util.Optional; +import java.util.function.Supplier; + +import io.helidon.builder.api.Option; +import io.helidon.builder.api.Prototype; +import io.helidon.common.concurrency.limits.spi.LimitProvider; + +/** + * Configuration of {@link io.helidon.common.concurrency.limits.AimdLimit}. + */ +@Prototype.Blueprint +@Prototype.Configured(value = AimdLimit.TYPE, root = false) +@Prototype.Provides(LimitProvider.class) +interface AimdLimitConfigBlueprint extends Prototype.Factory { + /** + * Backoff ratio to use for the algorithm. + * The value must be within [0.5, 1.0). + * + * @return backoff ratio + */ + @Option.Configured + @Option.DefaultDouble(0.9) + double backoffRatio(); + + /** + * Initial limit. + * The value must be within [{@link #minLimit()}, {@link #maxLimit()}]. + * + * @return initial limit + */ + @Option.Configured + @Option.DefaultInt(20) + int initialLimit(); + + /** + * Maximal limit. + * The value must be same or higher than {@link #minLimit()}. + * + * @return maximal limit + */ + @Option.Configured + @Option.DefaultInt(200) + int maxLimit(); + + /** + * Minimal limit. + * The value must be same or lower than {@link #maxLimit()}. + * + * @return minimal limit + */ + @Option.Configured + @Option.DefaultInt(20) + int minLimit(); + + /** + * Timeout that when exceeded is the same as if the task failed. + * + * @return task timeout, defaults to 5 seconds + */ + @Option.Configured + @Option.Default("PT5S") + Duration timeout(); + + /** + * A clock that supplies nanosecond time. + * + * @return supplier of current nanoseconds, defaults to {@link java.lang.System#nanoTime()} + */ + Optional> clock(); + + /** + * Name of this instance. + * + * @return name of the instance + */ + @Option.Default(AimdLimit.TYPE) + String name(); +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java new file mode 100644 index 00000000000..8e568c36c66 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.io.Serial; +import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +import io.helidon.common.config.ConfigException; + +class AimdLimitImpl { + private final double backoffRatio; + private final long timeoutInNanos; + private final int minLimit; + private final int maxLimit; + + private final Supplier clock; + private final AtomicInteger concurrentRequests; + private final AdjustableSemaphore semaphore; + + private final AtomicInteger limit; + private final Lock limitLock = new ReentrantLock(); + + AimdLimitImpl(AimdLimitConfig config) { + int initialLimit = config.initialLimit(); + this.backoffRatio = config.backoffRatio(); + this.timeoutInNanos = config.timeout().toNanos(); + this.minLimit = config.minLimit(); + this.maxLimit = config.maxLimit(); + this.clock = config.clock().orElseGet(() -> System::nanoTime); + + this.concurrentRequests = new AtomicInteger(); + this.semaphore = new AdjustableSemaphore(initialLimit); + + this.limit = new AtomicInteger(initialLimit); + + if (!(backoffRatio < 1.0 && backoffRatio >= 0.5)) { + throw new ConfigException("Backoff ratio must be within [0.5, 1.0)"); + } + if (maxLimit < minLimit) { + throw new ConfigException("Max limit must be higher than min limit"); + } + if (initialLimit > maxLimit) { + throw new ConfigException("Initial limit must be lower than max limit"); + } + if (initialLimit < minLimit) { + throw new ConfigException("Initial limit must be higher than minimum limit"); + } + + } + + Semaphore semaphore() { + return semaphore; + } + + int currentLimit() { + return limit.get(); + } + + void invoke(Runnable runnable) throws Exception { + invoke(() -> { + runnable.run(); + return null; + }); + } + + T invoke(Callable callable) throws Exception { + long startTime = clock.get(); + int currentRequests = concurrentRequests.incrementAndGet(); + + if (semaphore.tryAcquire()) { + try { + T response = callable.call(); + updateWithSample(startTime, clock.get(), currentRequests, true); + return response; + } catch (IgnoreTaskException e) { + return e.handle(); + } catch (Throwable e) { + updateWithSample(startTime, clock.get(), currentRequests, false); + throw e; + } finally { + concurrentRequests.decrementAndGet(); + semaphore.release(); + } + } else { + throw new LimitException("No more permits available for the semaphore"); + } + } + + void updateWithSample(long startTime, long endTime, int currentRequests, boolean success) { + long rtt = endTime - startTime; + + int currentLimit = limit.get(); + if (rtt > timeoutInNanos || !success) { + currentLimit = (int) (currentLimit * backoffRatio); + } else if (currentRequests * 2 >= currentLimit) { + currentLimit = currentLimit + 1; + } + setLimit(Math.min(maxLimit, Math.max(minLimit, currentLimit))); + } + + private void setLimit(int newLimit) { + if (newLimit == limit.get()) { + // already have the correct limit + return; + } + // now we lock, to do this only once in parallel, + // as otherwise we may end up in strange lands + limitLock.lock(); + try { + int oldLimit = limit.get(); + if (oldLimit == newLimit) { + // parallel thread already fixed it + return; + } + limit.set(newLimit); + + if (newLimit > oldLimit) { + this.semaphore.release(newLimit - oldLimit); + } else { + this.semaphore.reducePermits(oldLimit - newLimit); + } + } finally { + limitLock.unlock(); + } + } + + private static final class AdjustableSemaphore extends Semaphore { + @Serial + private static final long serialVersionUID = 114L; + + private AdjustableSemaphore(int permits) { + super(permits); + } + + @Override + protected void reducePermits(int reduction) { + super.reducePermits(reduction); + } + } +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitProvider.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitProvider.java new file mode 100644 index 00000000000..ae0af21e0ab --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitProvider.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import io.helidon.common.Weight; +import io.helidon.common.concurrency.limits.spi.LimitProvider; +import io.helidon.common.config.Config; + +/** + * {@link java.util.ServiceLoader} service provider for {@link io.helidon.common.concurrency.limits.AimdLimit} + * limit implementation. + */ +@Weight(80) +public class AimdLimitProvider implements LimitProvider { + /** + * Constructor required by the service loader. + */ + public AimdLimitProvider() { + } + + @Override + public String configKey() { + return AimdLimit.TYPE; + } + + @Override + public Limit create(Config config, String name) { + return AimdLimit.builder() + .config(config) + .name(name) + .build(); + } +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/BasicLimit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/BasicLimit.java new file mode 100644 index 00000000000..e665089cf4a --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/BasicLimit.java @@ -0,0 +1,221 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; +import java.util.function.Consumer; + +import io.helidon.builder.api.RuntimeType; +import io.helidon.common.config.Config; + +/** + * Basic limit, that provides {@link java.util.concurrent.Semaphore} like feature of limiting + * concurrent executions to a fixed number. + */ +@SuppressWarnings("removal") +@RuntimeType.PrototypedBy(BasicLimitConfig.class) +public class BasicLimit implements Limit, SemaphoreLimit, RuntimeType.Api { + /** + * Default limit, meaning unlimited execution. + */ + public static final int DEFAULT_LIMIT = 0; + static final String TYPE = "semaphore"; + + private final BasicLimitConfig config; + private final LimiterHandler handler; + + private BasicLimit(BasicLimitConfig config) { + this.config = config; + if (config.semaphore().isPresent()) { + this.handler = new RealSemaphoreHandler(config.semaphore().get()); + } else if (config.permits() == 0) { + this.handler = new NoOpSemaphoreHandler(); + } else { + this.handler = new RealSemaphoreHandler(new Semaphore(config.permits(), config.fair())); + } + } + + /** + * Create a new fluent API builder to construct {@link io.helidon.common.concurrency.limits.BasicLimit} + * instance. + * + * @return fluent API builder + */ + public static BasicLimitConfig.Builder builder() { + return BasicLimitConfig.builder(); + } + + /** + * Create a new instance with all defaults (no limit). + * + * @return a new limit instance + */ + public static BasicLimit create() { + return builder().build(); + } + + /** + * Create an instance from the provided semaphore. + * + * @param semaphore semaphore to use + * @return a new basic limit backed by the provided semaphore + */ + public static BasicLimit create(Semaphore semaphore) { + return builder() + .semaphore(semaphore) + .build(); + } + + /** + * Create a new instance from configuration. + * + * @param config configuration of the basic limit + * @return a new limit instance configured from {@code config} + */ + public static BasicLimit create(Config config) { + return builder() + .config(config) + .build(); + } + + /** + * Create a new instance from configuration. + * + * @param config configuration of the basic limit + * @return a new limit instance configured from {@code config} + */ + public static BasicLimit create(BasicLimitConfig config) { + return new BasicLimit(config); + } + + /** + * Create a new instance customizing its configuration. + * + * @param consumer consumer of configuration builder + * @return a new limit instance configured from the builder + */ + public static BasicLimit create(Consumer consumer) { + return builder() + .update(consumer) + .build(); + } + + @Override + public T invoke(Callable callable) throws Exception { + return handler.invoke(callable); + } + + @Override + public void invoke(Runnable runnable) throws Exception { + handler.invoke(runnable); + } + + @SuppressWarnings("removal") + @Override + public Semaphore semaphore() { + return handler.semaphore(); + } + + @Override + public BasicLimitConfig prototype() { + return config; + } + + @Override + public String name() { + return config.name(); + } + + @Override + public String type() { + return BasicLimit.TYPE; + } + + @SuppressWarnings("removal") + private interface LimiterHandler extends SemaphoreLimit { + T invoke(Callable callable) throws Exception; + void invoke(Runnable runnable) throws Exception; + } + + private static class NoOpSemaphoreHandler implements LimiterHandler { + @Override + public T invoke(Callable callable) throws Exception { + try { + return callable.call(); + } catch (IgnoreTaskException e) { + return e.handle(); + } + } + + @Override + public void invoke(Runnable runnable) { + runnable.run(); + } + + @SuppressWarnings("removal") + @Override + public Semaphore semaphore() { + return NoopSemaphore.INSTANCE; + } + } + + @SuppressWarnings("removal") + private static class RealSemaphoreHandler implements LimiterHandler { + private final Semaphore semaphore; + + private RealSemaphoreHandler(Semaphore semaphore) { + this.semaphore = semaphore; + } + + @Override + public T invoke(Callable callable) throws Exception { + if (semaphore.tryAcquire()) { + try { + return callable.call(); + } catch (IgnoreTaskException e) { + return e.handle(); + } finally { + semaphore.release(); + } + } else { + throw new LimitException("No more permits available for the semaphore"); + } + } + + @Override + public void invoke(Runnable runnable) throws Exception { + if (semaphore.tryAcquire()) { + try { + runnable.run(); + } catch (IgnoreTaskException e) { + e.handle(); + } finally { + semaphore.release(); + } + } else { + throw new LimitException("No more permits available for the semaphore"); + } + } + + @Override + public Semaphore semaphore() { + return semaphore; + } + } + +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/BasicLimitConfigBlueprint.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/BasicLimitConfigBlueprint.java new file mode 100644 index 00000000000..14615fcacea --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/BasicLimitConfigBlueprint.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.util.Optional; +import java.util.concurrent.Semaphore; + +import io.helidon.builder.api.Option; +import io.helidon.builder.api.Prototype; + +/** + * Configuration of {@link BasicLimit}. + */ +@Prototype.Blueprint +@Prototype.Configured(value = BasicLimit.TYPE, root = false) +interface BasicLimitConfigBlueprint extends Prototype.Factory { + /** + * Number of permit to allow. + * Defaults to {@value BasicLimit#DEFAULT_LIMIT}. + * When set to {@code 0}, we switch to unlimited. + * + * @return number of permits + */ + @Option.Configured + @Option.DefaultInt(BasicLimit.DEFAULT_LIMIT) + int permits(); + + /** + * Whether the {@link java.util.concurrent.Semaphore} should be {@link java.util.concurrent.Semaphore#isFair()}. + * Defaults to {@code false}. + * + * @return whether this should be a fair semaphore + */ + @Option.Configured + @Option.DefaultBoolean(false) + boolean fair(); + + /** + * Name of this instance. + * + * @return name of the instance + */ + @Option.Default(BasicLimit.TYPE) + String name(); + + /** + * Explicitly configured semaphore. + * Note that if this is set, all other configuration is ignored. + * + * @return semaphore instance + */ + Optional semaphore(); + +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/BasicLimitProvider.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/BasicLimitProvider.java new file mode 100644 index 00000000000..05cbeb9c75d --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/BasicLimitProvider.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import io.helidon.common.Weight; +import io.helidon.common.concurrency.limits.spi.LimitProvider; +import io.helidon.common.config.Config; + +/** + * {@link java.util.ServiceLoader} service provider for {@link io.helidon.common.concurrency.limits.BasicLimit} + * limit implementation. + */ +@Weight(90) +public class BasicLimitProvider implements LimitProvider { + /** + * Constructor required by the service loader. + */ + public BasicLimitProvider() { + } + + @Override + public String configKey() { + return BasicLimit.TYPE; + } + + @Override + public Limit create(Config config, String name) { + return BasicLimit.builder() + .config(config) + .name(name) + .build(); + } +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/IgnoreTaskException.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/IgnoreTaskException.java new file mode 100644 index 00000000000..e6e7458fb89 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/IgnoreTaskException.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.util.Objects; + +/** + * If this exception is thrown from a limited task within + * {@link Limit#invoke(java.util.concurrent.Callable)}, the + * invocation will be ignored by possible algorithms (for example when considering round-trip timing). + *

+ * This should be used for cases where we never got to execute the intended task. + * This exception should never be thrown by {@link Limit}, it should always + * be translated to a proper return type, or actual exception. + */ +public class IgnoreTaskException extends RuntimeException { + /** + * Desired return value, if we want to ignore the result, yet we still provide valid response. + */ + private final Object returnValue; + /** + * Exception to throw to the user. This is to allow throwing an exception while ignoring it for limits algorithm. + */ + private final Exception exception; + + /** + * Create a new instance with a cause. + * + * @param cause the cause of this exception + */ + public IgnoreTaskException(Exception cause) { + super(Objects.requireNonNull(cause)); + + this.exception = cause; + this.returnValue = null; + } + + /** + * Create a new instance with a return value. + * + * @param returnValue value to return, even though this invocation should be ignored + * return value may be {@code null}. + */ + public IgnoreTaskException(Object returnValue) { + this.exception = null; + this.returnValue = returnValue; + } + + /** + * This is used by limit implementations to either return the value, or throw an exception. + * + * @return the value provided to be the return value + * @param type of the return value + * @throws Exception exception provided by the task + */ + @SuppressWarnings("unchecked") + public T handle() throws Exception { + if (returnValue == null && exception != null) { + throw exception; + } + return (T) returnValue; + } +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/Limit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/Limit.java new file mode 100644 index 00000000000..4a2ab4e5b82 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/Limit.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.util.concurrent.Callable; + +import io.helidon.common.config.NamedService; +import io.helidon.service.registry.Service; + +/** + * Contract for a concurrency limiter. + */ +@Service.Contract +public interface Limit extends NamedService { + /** + * Invoke a callable within the limits of this limiter. + *

+ * {@link io.helidon.common.concurrency.limits.Limit} implementors note: + * Make sure to catch {@link io.helidon.common.concurrency.limits.IgnoreTaskException} from the + * callable, and call its {@link IgnoreTaskException#handle()} to either return the provided result, + * or throw the exception after ignoring the timing for future decisions. + * + * @param callable callable to execute within the limit + * @param the callable return type + * @return result of the callable + * @throws LimitException in case the limiter did not have an available permit + * @throws java.lang.Exception in case the task failed with an exception + */ + T invoke(Callable callable) throws LimitException, Exception; + + /** + * Invoke a runnable within the limits of this limiter. + *

+ * {@link io.helidon.common.concurrency.limits.Limit} implementors note: + * Make sure to catch {@link io.helidon.common.concurrency.limits.IgnoreTaskException} from the + * runnable, and call its {@link IgnoreTaskException#handle()} to either return the provided result, + * or throw the exception after ignoring the timing for future decisions. + * + * @param runnable runnable to execute within the limit + * @throws LimitException in case the limiter did not have an available permit + * @throws java.lang.Exception in case the task failed with an exception + */ + void invoke(Runnable runnable) throws LimitException, Exception; +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitException.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitException.java new file mode 100644 index 00000000000..987d9cf99fa --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitException.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.util.Objects; + +/** + * A limit was reached and the submitted task cannot be executed. + * + * @see io.helidon.common.concurrency.limits.Limit#invoke(java.util.concurrent.Callable) + * @see io.helidon.common.concurrency.limits.Limit#invoke(Runnable) + */ +public class LimitException extends RuntimeException { + /** + * A new limit exception with a cause. + * + * @param cause cause of the limit reached + */ + public LimitException(Exception cause) { + super(Objects.requireNonNull(cause)); + } + + /** + * A new limit exception with a message. + * + * @param message description of why the limit was reached + */ + public LimitException(String message) { + super(Objects.requireNonNull(message)); + } +} diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/NoopSemaphore.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/NoopSemaphore.java similarity index 79% rename from webserver/webserver/src/main/java/io/helidon/webserver/NoopSemaphore.java rename to common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/NoopSemaphore.java index a20c8790089..b569585bcaf 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/NoopSemaphore.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/NoopSemaphore.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Oracle and/or its affiliates. + * Copyright (c) 2023, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,18 +14,28 @@ * limitations under the License. */ -package io.helidon.webserver; +package io.helidon.common.concurrency.limits; import java.util.Collection; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -/* +/** * A semaphore that does nothing. + * Use {@link #INSTANCE} to get an instance of this semaphore. + * + * @deprecated this is only provided for backward compatibility and will be removed, use + * {@link BasicLimit#create()} to get unlimited limit */ -class NoopSemaphore extends Semaphore { - NoopSemaphore() { +@Deprecated(forRemoval = true, since = "4.2.0") +public class NoopSemaphore extends Semaphore { + /** + * Singleton instance to be used whenever needed. + */ + public static final Semaphore INSTANCE = new NoopSemaphore(); + + private NoopSemaphore() { super(0); } diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/SemaphoreLimit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/SemaphoreLimit.java new file mode 100644 index 00000000000..77332ce397b --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/SemaphoreLimit.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.util.concurrent.Semaphore; + +/** + * The {@link io.helidon.common.concurrency.limits.Limit} is backed by a semaphore, and this provides + * direct access to the semaphore. + * Note that this usage may bypass calculation of limits if the semaphore is used directly. + * This is for backward compatibility only, and will be removed. + * + * @deprecated DO NOT USE except for backward compatibility with semaphore based handling + */ +@Deprecated(since = "4.2.0", forRemoval = true) +public interface SemaphoreLimit { + /** + * Underlying semaphore of this limit. + * + * @return the semaphore instance + * @deprecated this only exists for backward compatibility of Helidon WebServer and will be removed + */ + @Deprecated(forRemoval = true, since = "4.2.0") + Semaphore semaphore(); +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/package-info.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/package-info.java new file mode 100644 index 00000000000..a4efde7abc4 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Concurrency limits API and default implementations. + * + * @see io.helidon.common.concurrency.limits.Limit + * @see io.helidon.common.concurrency.limits.BasicLimit + */ +package io.helidon.common.concurrency.limits; diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/spi/LimitProvider.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/spi/LimitProvider.java new file mode 100644 index 00000000000..c192b734ff9 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/spi/LimitProvider.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits.spi; + +import io.helidon.common.concurrency.limits.Limit; +import io.helidon.common.config.ConfiguredProvider; +import io.helidon.service.registry.Service; + +/** + * A {@link java.util.ServiceLoader} (and service registry) service provider to discover rate limits. + */ +@Service.Contract +public interface LimitProvider extends ConfiguredProvider { +} diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/spi/package-info.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/spi/package-info.java new file mode 100644 index 00000000000..e88d31d397d --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/spi/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Extension points to create custom concurrency rate limits. + */ +package io.helidon.common.concurrency.limits.spi; diff --git a/common/concurrency/limits/src/main/java/module-info.java b/common/concurrency/limits/src/main/java/module-info.java new file mode 100644 index 00000000000..83eedd25366 --- /dev/null +++ b/common/concurrency/limits/src/main/java/module-info.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Concurrency limits. + * + * @see io.helidon.common.concurrency.limits + */ +module io.helidon.common.concurrency.limits { + requires static io.helidon.service.registry; + + requires io.helidon.builder.api; + requires io.helidon.common; + requires io.helidon.common.config; + + exports io.helidon.common.concurrency.limits; + exports io.helidon.common.concurrency.limits.spi; + + provides io.helidon.common.concurrency.limits.spi.LimitProvider + with io.helidon.common.concurrency.limits.BasicLimitProvider, + io.helidon.common.concurrency.limits.AimdLimitProvider; +} \ No newline at end of file diff --git a/common/concurrency/limits/src/main/resources/META-INF/helidon/service.loader b/common/concurrency/limits/src/main/resources/META-INF/helidon/service.loader new file mode 100644 index 00000000000..e2dd011e9fa --- /dev/null +++ b/common/concurrency/limits/src/main/resources/META-INF/helidon/service.loader @@ -0,0 +1,2 @@ +# List of service contracts we want to support either from service registry, or from service loader +io.helidon.common.concurrency.limits.spi.LimitProvider diff --git a/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/AimdLimitTest.java b/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/AimdLimitTest.java new file mode 100644 index 00000000000..b2f8615eb1d --- /dev/null +++ b/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/AimdLimitTest.java @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class AimdLimitTest { + @Test + void decreaseOnDrops() { + AimdLimitConfig config = AimdLimitConfig.builder() + .initialLimit(30) + .buildPrototype(); + + AimdLimitImpl limiter = new AimdLimitImpl(config); + + assertThat(limiter.currentLimit(), is(30)); + limiter.updateWithSample(0, 0, 0, false); + assertThat(limiter.currentLimit(), is(27)); + } + + @Test + void decreaseOnTimeoutExceeded() { + Duration timeout = Duration.ofSeconds(1); + AimdLimitConfig config = AimdLimitConfig.builder() + .initialLimit(30) + .timeout(timeout) + .buildPrototype(); + AimdLimitImpl limiter = new AimdLimitImpl(config); + limiter.updateWithSample(0, timeout.toNanos() + 1, 0, true); + assertThat(limiter.currentLimit(), is(27)); + } + + @Test + void increaseOnSuccess() { + AimdLimitConfig config = AimdLimitConfig.builder() + .initialLimit(20) + .buildPrototype(); + AimdLimitImpl limiter = new AimdLimitImpl(config); + limiter.updateWithSample(0, Duration.ofMillis(1).toNanos(), 10, true); + assertThat(limiter.currentLimit(), is(21)); + } + + @Test + void successOverflow() { + AimdLimitConfig config = AimdLimitConfig.builder() + .initialLimit(21) + .maxLimit(21) + .minLimit(0) + .buildPrototype(); + AimdLimitImpl limiter = new AimdLimitImpl(config); + limiter.updateWithSample(0, Duration.ofMillis(1).toNanos(), 10, true); + // after success limit should still be at the max. + assertThat(limiter.currentLimit(), is(21)); + } + + @Test + void testDefault() { + AimdLimitConfig config = AimdLimitConfig.builder() + .minLimit(10) + .initialLimit(10) + .buildPrototype(); + AimdLimitImpl limiter = new AimdLimitImpl(config); + assertThat(limiter.currentLimit(), is(10)); + } + + @Test + void concurrentUpdatesAndReads() throws InterruptedException { + AimdLimitConfig config = AimdLimitConfig.builder() + .initialLimit(1) + .backoffRatio(0.9) + .timeout(Duration.ofMillis(100)) + .minLimit(1) + .maxLimit(200) + .buildPrototype(); + AimdLimitImpl limit = new AimdLimitImpl(config); + + int threadCount = 100; + int operationsPerThread = 1_000; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch endLatch = new CountDownLatch(threadCount); + + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger timeoutCount = new AtomicInteger(0); + AtomicInteger dropCount = new AtomicInteger(0); + + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + startLatch.await(); // Wait for all threads to be ready + for (int j = 0; j < operationsPerThread; j++) { + long startTime = System.nanoTime(); + long rtt = (long) (Math.random() * 200_000_000); // 0-200ms + int concurrentRequests = (int) (Math.random() * limit.currentLimit() * 2); + boolean didDrop = Math.random() < 0.01; // 1% chance of drop + + limit.updateWithSample(startTime, rtt, concurrentRequests, !didDrop); + + if (didDrop) { + dropCount.incrementAndGet(); + } else if (rtt > config.timeout().toNanos()) { + timeoutCount.incrementAndGet(); + } else { + successCount.incrementAndGet(); + } + + // Read the current limit + int currentLimit = limit.currentLimit(); + assertThat(currentLimit, is(greaterThanOrEqualTo(config.minLimit()))); + assertThat(currentLimit, is(lessThanOrEqualTo(config.maxLimit()))); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + endLatch.countDown(); + } + }); + } + + startLatch.countDown(); // Start all threads + boolean finished = endLatch.await(10, TimeUnit.SECONDS); + executor.shutdown(); + + assertThat("Test did not complete in time", finished, is(true)); + + assertThat("Total operations mismatch", + threadCount * operationsPerThread, + is(successCount.get() + timeoutCount.get() + dropCount.get())); + } + + @Test + public void testSemaphoreReleased() throws Exception { + Limit limit = AimdLimit.builder() + .minLimit(5) + .initialLimit(5) + .build(); + + for (int i = 0; i < 5000; i++) { + limit.invoke(() -> {}); + } + } +} diff --git a/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/BasicLimitTest.java b/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/BasicLimitTest.java new file mode 100644 index 00000000000..13455aac500 --- /dev/null +++ b/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/BasicLimitTest.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.concurrency.limits; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; + +public class BasicLimitTest { + @Test + public void testUnlimited() throws InterruptedException { + BasicLimit limiter = BasicLimit.create(); + int concurrency = 5; + CountDownLatch cdl = new CountDownLatch(1); + Lock lock = new ReentrantLock(); + List result = new ArrayList<>(concurrency); + + Thread[] threads = new Thread[concurrency]; + for (int i = 0; i < concurrency; i++) { + int index = i; + threads[i] = new Thread(() -> { + try { + limiter.invoke(() -> { + cdl.await(10, TimeUnit.SECONDS); + lock.lock(); + try { + result.add("result_" + index); + } finally { + lock.unlock(); + } + return null; + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + for (Thread thread : threads) { + thread.start(); + } + cdl.countDown(); + for (Thread thread : threads) { + thread.join(Duration.ofSeconds(5)); + } + assertThat(result, hasSize(concurrency)); + } + + @Test + public void testLimit() throws Exception { + BasicLimit limiter = BasicLimit.builder() + .permits(1) + .build(); + + int concurrency = 5; + CountDownLatch cdl = new CountDownLatch(1); + Lock lock = new ReentrantLock(); + List result = new ArrayList<>(concurrency); + AtomicInteger failures = new AtomicInteger(); + + Thread[] threads = new Thread[concurrency]; + for (int i = 0; i < concurrency; i++) { + int index = i; + threads[i] = new Thread(() -> { + try { + limiter.invoke(() -> { + cdl.await(10, TimeUnit.SECONDS); + lock.lock(); + try { + result.add("result_" + index); + } finally { + lock.unlock(); + } + return null; + }); + } catch (LimitException e) { + failures.incrementAndGet(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + cdl.countDown(); + for (Thread thread : threads) { + thread.join(Duration.ofSeconds(5)); + } + assertThat(failures.get(), is(concurrency - 1)); + assertThat(result, hasSize(1)); + } + + @Test + public void testSemaphoreReleased() throws Exception { + Limit limit = BasicLimit.builder() + .permits(5) + .build(); + + for (int i = 0; i < 5000; i++) { + limit.invoke(() -> {}); + } + } +} diff --git a/common/concurrency/pom.xml b/common/concurrency/pom.xml new file mode 100644 index 00000000000..89002343296 --- /dev/null +++ b/common/concurrency/pom.xml @@ -0,0 +1,39 @@ + + + + + 4.0.0 + + io.helidon.common + helidon-common-project + 4.1.0-SNAPSHOT + ../pom.xml + + + io.helidon.common.concurrency + helidon-common-concurrency-project + Helidon Common Concurrency Project + + pom + + + limits + + diff --git a/common/pom.xml b/common/pom.xml index 0949c39bf92..b62636185ab 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -55,6 +55,7 @@ tls types uri + concurrency diff --git a/fault-tolerance/fault-tolerance/pom.xml b/fault-tolerance/fault-tolerance/pom.xml index 2a88f703817..6063e9d3bed 100644 --- a/fault-tolerance/fault-tolerance/pom.xml +++ b/fault-tolerance/fault-tolerance/pom.xml @@ -41,6 +41,10 @@ io.helidon.common helidon-common-configurable + + io.helidon.common.concurrency + helidon-common-concurrency-limits + + + 4.0.0 + + io.helidon.webserver + helidon-webserver-project + 4.1.0-SNAPSHOT + + + helidon-webserver-concurrency-limits + Helidon WebServer Concurrency Limits + Feature that adds filters for concurrency limits + + + + io.helidon.webserver + helidon-webserver + + + io.helidon.common.concurrency + helidon-common-concurrency-limits + + + io.helidon.common + helidon-common-config + + + helidon-builder-api + io.helidon.builder + + + io.helidon.webserver.testing.junit5 + helidon-webserver-testing-junit5 + test + + + org.junit.jupiter + junit-jupiter-api + test + + + org.hamcrest + hamcrest-all + test + + + io.helidon.fault-tolerance + helidon-fault-tolerance + test + + + io.helidon.logging + helidon-logging-jul + test + + + io.helidon.config + helidon-config-yaml + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + io.helidon.config.metadata + helidon-config-metadata-codegen + ${helidon.version} + + + io.helidon.codegen + helidon-codegen-apt + ${helidon.version} + + + io.helidon.builder + helidon-builder-codegen + ${helidon.version} + + + io.helidon.codegen + helidon-codegen-helidon-copyright + ${helidon.version} + + + + + + io.helidon.config.metadata + helidon-config-metadata-codegen + ${helidon.version} + + + io.helidon.codegen + helidon-codegen-apt + ${helidon.version} + + + io.helidon.builder + helidon-builder-codegen + ${helidon.version} + + + io.helidon.codegen + helidon-codegen-helidon-copyright + ${helidon.version} + + + + + + diff --git a/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeature.java b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeature.java new file mode 100644 index 00000000000..62f77a2cb10 --- /dev/null +++ b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeature.java @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.concurrency.limits; + +import java.util.Set; +import java.util.function.Consumer; + +import io.helidon.builder.api.RuntimeType; +import io.helidon.common.Weighted; +import io.helidon.common.config.Config; +import io.helidon.webserver.WebServer; +import io.helidon.webserver.spi.ServerFeature; + +/** + * Server feature that adds limits as filters. + *

+ * When using this feature, the limits operation is enforced within a filter, i.e. after the request + * is accepted. This means it is used only for HTTP requests. + */ +@RuntimeType.PrototypedBy(LimitsFeatureConfig.class) +public class LimitsFeature implements ServerFeature, Weighted, RuntimeType.Api { + /** + * Default weight of this feature. It is the first feature to be registered (above context and access log). + *

+ * Context: 1100 + *

+ * Access Log: 1000 + *

+ * This feature: {@value} + */ + public static final double WEIGHT = 2000; + static final String ID = "limits"; + + private final LimitsFeatureConfig config; + + private LimitsFeature(LimitsFeatureConfig config) { + this.config = config; + } + + /** + * Fluent API builder to set up an instance. + * + * @return a new builder + */ + public static LimitsFeatureConfig.Builder builder() { + return LimitsFeatureConfig.builder(); + } + + /** + * Create a new instance from its configuration. + * + * @param config configuration + * @return a new feature + */ + public static LimitsFeature create(LimitsFeatureConfig config) { + return new LimitsFeature(config); + } + + /** + * Create a new instance customizing its configuration. + * + * @param builderConsumer consumer of configuration + * @return a new feature + */ + public static LimitsFeature create(Consumer builderConsumer) { + return builder() + .update(builderConsumer) + .build(); + } + + /** + * Create a new limits feature with default setup, but enabled. + * + * @return a new feature + */ + public static LimitsFeature create() { + return builder() + .enabled(true) + .build(); + } + + /** + * Create a new context feature with custom setup. + * + * @param config configuration + * @return a new configured feature + */ + public static LimitsFeature create(Config config) { + return builder() + .config(config) + .build(); + } + + @Override + public void setup(ServerFeatureContext featureContext) { + double featureWeight = config.weight(); + // all sockets + Set sockets = config.sockets(); + if (sockets.isEmpty()) { + // configure on default only + featureContext.socket(WebServer.DEFAULT_SOCKET_NAME) + .httpRouting() + .addFeature(new LimitsRoutingFeature(config, featureWeight)); + } else { + // configure on all configured + for (String socket : sockets) { + featureContext.socket(socket) + .httpRouting() + .addFeature(new LimitsRoutingFeature(config, featureWeight)); + } + } + } + + @Override + public String name() { + return config.name(); + } + + @Override + public String type() { + return ID; + } + + @Override + public double weight() { + return config.weight(); + } + + @Override + public LimitsFeatureConfig prototype() { + return config; + } +} diff --git a/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeatureConfigBlueprint.java b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeatureConfigBlueprint.java new file mode 100644 index 00000000000..f1bbf1afce3 --- /dev/null +++ b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeatureConfigBlueprint.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.concurrency.limits; + +import java.util.Optional; +import java.util.Set; + +import io.helidon.builder.api.Option; +import io.helidon.builder.api.Prototype; +import io.helidon.common.concurrency.limits.Limit; +import io.helidon.common.concurrency.limits.spi.LimitProvider; +import io.helidon.webserver.spi.ServerFeatureProvider; + +@Prototype.Blueprint +@Prototype.Configured(value = LimitsFeature.ID, root = false) +@Prototype.Provides(ServerFeatureProvider.class) +interface LimitsFeatureConfigBlueprint extends Prototype.Factory { + /** + * Weight of the context feature. As it is used by other features, the default is quite high: + * {@value LimitsFeature#WEIGHT}. + * + * @return weight of the feature + */ + @Option.DefaultDouble(LimitsFeature.WEIGHT) + @Option.Configured + double weight(); + + /** + * List of sockets to register this feature on. If empty, it would get registered on all sockets. + * + * @return socket names to register on, defaults to empty (all available sockets) + */ + @Option.Configured + Set sockets(); + + /** + * Name of this instance. + * + * @return instance name + */ + @Option.Default(LimitsFeature.ID) + String name(); + + /** + * Concurrency limit to use to limit concurrent execution of incoming requests. + * The default is to have unlimited concurrency. + * + * @return concurrency limit + */ + @Option.Provider(value = LimitProvider.class, discoverServices = false) + @Option.Configured + Optional limit(); + + /** + * Whether this feature is enabled, defaults to {@code true}. + * + * @return whether to enable this feature + */ + @Option.DefaultBoolean(true) + @Option.Configured + boolean enabled(); +} diff --git a/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeatureProvider.java b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeatureProvider.java new file mode 100644 index 00000000000..5d26cf8e461 --- /dev/null +++ b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsFeatureProvider.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.concurrency.limits; + +import io.helidon.common.Weight; +import io.helidon.common.config.Config; +import io.helidon.webserver.spi.ServerFeatureProvider; + +/** + * {@link java.util.ServiceLoader} provider implementation to automatically register this service. + *

+ * The required configuration (disabled by default): + *

+ * server:
+ *   features:
+ *     limits:
+ *       enabled: true
+ *       limit:
+ *         bulkhead:
+ *         limit: 10
+ *         queue: 100
+ * 
+ */ +@Weight(LimitsFeature.WEIGHT) +public class LimitsFeatureProvider implements ServerFeatureProvider { + /** + * Public constructor required by {@link java.util.ServiceLoader}. + */ + public LimitsFeatureProvider() { + } + + @Override + public String configKey() { + return LimitsFeature.ID; + } + + @Override + public LimitsFeature create(Config config, String name) { + return LimitsFeature.builder() + .config(config) + .name(name) + .build(); + } +} diff --git a/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsRoutingFeature.java b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsRoutingFeature.java new file mode 100644 index 00000000000..93c553975e0 --- /dev/null +++ b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/LimitsRoutingFeature.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.concurrency.limits; + +import io.helidon.common.Weighted; +import io.helidon.common.concurrency.limits.Limit; +import io.helidon.common.concurrency.limits.LimitException; +import io.helidon.http.InternalServerException; +import io.helidon.http.Status; +import io.helidon.webserver.http.FilterChain; +import io.helidon.webserver.http.HttpFeature; +import io.helidon.webserver.http.HttpRouting; +import io.helidon.webserver.http.RoutingRequest; +import io.helidon.webserver.http.RoutingResponse; + +class LimitsRoutingFeature implements HttpFeature, Weighted { + private final double featureWeight; + private final Limit limits; + private final boolean enabled; + + LimitsRoutingFeature(LimitsFeatureConfig config, double featureWeight) { + this.featureWeight = featureWeight; + this.limits = config.limit().orElse(null); + this.enabled = config.enabled(); + } + + @Override + public void setup(HttpRouting.Builder builder) { + if (enabled && limits != null) { + builder.addFilter(this::filter); + } + } + + private void filter(FilterChain chain, RoutingRequest req, RoutingResponse res) { + try { + limits.invoke(() -> { + chain.proceed(); + return null; + }); + } catch (LimitException ex) { + res.status(Status.SERVICE_UNAVAILABLE_503).send(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new InternalServerException("Failed to invoke limit", e); + } + } + + @Override + public double weight() { + return featureWeight; + } +} diff --git a/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/package-info.java b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/package-info.java new file mode 100644 index 00000000000..d1075d62ffb --- /dev/null +++ b/webserver/concurrency-limits/src/main/java/io/helidon/webserver/concurrency/limits/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * An implementation of a feature to protect all server requests with a limit. + */ +package io.helidon.webserver.concurrency.limits; diff --git a/webserver/concurrency-limits/src/main/java/module-info.java b/webserver/concurrency-limits/src/main/java/module-info.java new file mode 100644 index 00000000000..d1d99c99b8a --- /dev/null +++ b/webserver/concurrency-limits/src/main/java/module-info.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Limits feature for Helidon WebServer. + */ +module io.helidon.webserver.concurrency.limits { + requires io.helidon.common; + requires io.helidon.http; + requires io.helidon.webserver; + + requires transitive io.helidon.builder.api; + requires transitive io.helidon.common.config; + requires transitive io.helidon.common.concurrency.limits; + + exports io.helidon.webserver.concurrency.limits; + + provides io.helidon.webserver.spi.ServerFeatureProvider + with io.helidon.webserver.concurrency.limits.LimitsFeatureProvider; + + uses io.helidon.common.concurrency.limits.spi.LimitProvider; +} \ No newline at end of file diff --git a/webserver/concurrency-limits/src/test/java/io/helidon/webserver/concurrency/limits/BulkheadTest.java b/webserver/concurrency-limits/src/test/java/io/helidon/webserver/concurrency/limits/BulkheadTest.java new file mode 100644 index 00000000000..e08ce2386eb --- /dev/null +++ b/webserver/concurrency-limits/src/test/java/io/helidon/webserver/concurrency/limits/BulkheadTest.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.webserver.concurrency.limits; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import io.helidon.http.Status; +import io.helidon.webclient.api.ClientResponseTyped; +import io.helidon.webclient.http1.Http1Client; +import io.helidon.webserver.http.HttpRules; +import io.helidon.webserver.testing.junit5.ServerTest; +import io.helidon.webserver.testing.junit5.SetUpRoute; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +@ServerTest +public class BulkheadTest { + private static final CountDownLatch FIRST_ENCOUNTER = new CountDownLatch(1); + private static final CountDownLatch FINISH_LATCH = new CountDownLatch(1); + + private final Http1Client client; + + public BulkheadTest(Http1Client client) { + this.client = client; + } + + @SetUpRoute + public static void route(HttpRules rules) { + rules.get("/greet", (req, res) -> res.send("Hello")) + .get("/wait", (req, res) -> { + FIRST_ENCOUNTER.countDown(); + FINISH_LATCH.await(); + res.send("finished"); + }); + } + + @Test + public void testRequest() { + var response = client.get("/greet") + .request(String.class); + + assertThat(response.status(), is(Status.OK_200)); + assertThat(response.entity(), is("Hello")); + } + + @Test + public void testBulkhead() throws Exception { + Callable> callable = () -> { + return client.get("/wait") + .request(String.class); + }; + try (ExecutorService es = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().factory())) { + var first = es.submit(callable); + FIRST_ENCOUNTER.await(); + var secondResponse = es.submit(callable) + .get(5, TimeUnit.SECONDS); + + assertThat(secondResponse.status(), is(Status.SERVICE_UNAVAILABLE_503)); + FINISH_LATCH.countDown(); + var firstResponse = first.get(5, TimeUnit.SECONDS); + assertThat(firstResponse.status(), is(Status.OK_200)); + assertThat(firstResponse.entity(), is("finished")); + + } + } +} diff --git a/webserver/concurrency-limits/src/test/resources/application.yaml b/webserver/concurrency-limits/src/test/resources/application.yaml new file mode 100644 index 00000000000..4cea2a4738f --- /dev/null +++ b/webserver/concurrency-limits/src/test/resources/application.yaml @@ -0,0 +1,23 @@ +# +# Copyright (c) 2024 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +server: + features: + limits: + limit: + bulkhead: + limit: 1 + queue-length: 0 diff --git a/webserver/concurrency-limits/src/test/resources/logging.properties b/webserver/concurrency-limits/src/test/resources/logging.properties new file mode 100644 index 00000000000..eb45bdbe3fa --- /dev/null +++ b/webserver/concurrency-limits/src/test/resources/logging.properties @@ -0,0 +1,21 @@ +# +# Copyright (c) 2024 Oracle and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +handlers=java.util.logging.ConsoleHandler +java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS.%1$tL %5$s%6$s%n +# Global logging level. Can be overridden by specific loggers +.level=INFO +io.helidon.webserver.level=INFO diff --git a/webserver/http2/pom.xml b/webserver/http2/pom.xml index d2758b26274..85461395674 100644 --- a/webserver/http2/pom.xml +++ b/webserver/http2/pom.xml @@ -44,6 +44,10 @@ io.helidon.builder helidon-builder-api
+ + io.helidon.common.concurrency + helidon-common-concurrency-limits + io.helidon.common.features helidon-common-features-api diff --git a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2Connection.java b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2Connection.java index fa70fa15fb9..6a0bf06b60b 100644 --- a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2Connection.java +++ b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2Connection.java @@ -27,6 +27,8 @@ import io.helidon.common.buffers.BufferData; import io.helidon.common.buffers.DataReader; +import io.helidon.common.concurrency.limits.BasicLimit; +import io.helidon.common.concurrency.limits.Limit; import io.helidon.common.task.InterruptableTask; import io.helidon.common.tls.TlsUtils; import io.helidon.http.DateTime; @@ -172,9 +174,9 @@ private static void applySetting(Http2Settings.Builder builder, long value, Http } @Override - public void handle(Semaphore requestSemaphore) throws InterruptedException { + public void handle(Limit limit) throws InterruptedException { try { - doHandle(requestSemaphore); + doHandle(limit); } catch (Http2Exception e) { if (state == State.FINISHED) { // already handled @@ -209,6 +211,12 @@ public void handle(Semaphore requestSemaphore) throws InterruptedException { } } + @SuppressWarnings("removal") + @Override + public void handle(Semaphore requestSemaphore) throws InterruptedException { + handle(BasicLimit.create(requestSemaphore)); + } + /** * Client settings, obtained from SETTINGS frame or HTTP/2 upgrade request. * @@ -326,7 +334,7 @@ Http2Settings clientSettings() { return clientSettings; } - private void doHandle(Semaphore requestSemaphore) throws InterruptedException { + private void doHandle(Limit limit) throws InterruptedException { myThread = Thread.currentThread(); while (canRun && state != State.FINISHED) { if (expectPreface && state != State.WRITE_SERVER_SETTINGS) { @@ -341,10 +349,9 @@ private void doHandle(Semaphore requestSemaphore) throws InterruptedException { // no data to read -> connection is closed throw new CloseConnectionException("Connection closed by client", e); } - dispatchHandler(requestSemaphore); - } else { - dispatchHandler(requestSemaphore); } + + dispatchHandler(limit); } if (state != State.FINISHED) { Http2GoAway frame = new Http2GoAway(0, Http2ErrorCode.NO_ERROR, "Idle timeout"); @@ -352,7 +359,7 @@ private void doHandle(Semaphore requestSemaphore) throws InterruptedException { } } - private void dispatchHandler(Semaphore requestSemaphore) { + private void dispatchHandler(Limit limit) { switch (state) { case CONTINUATION -> doContinuation(); case WRITE_SERVER_SETTINGS -> writeServerSettings(); @@ -360,7 +367,7 @@ private void dispatchHandler(Semaphore requestSemaphore) { case SETTINGS -> doSettings(); case ACK_SETTINGS -> ackSettings(); case DATA -> dataFrame(); - case HEADERS -> doHeaders(requestSemaphore); + case HEADERS -> doHeaders(limit); case PRIORITY -> doPriority(); case READ_PUSH_PROMISE -> throw new Http2Exception(Http2ErrorCode.REFUSED_STREAM, "Push promise not supported"); case PING -> pingFrame(); @@ -606,7 +613,7 @@ private void dataFrame() { state = State.READ_FRAME; } - private void doHeaders(Semaphore requestSemaphore) { + private void doHeaders(Limit limit) { int streamId = frameHeader.streamId(); StreamContext streamContext = stream(streamId); @@ -675,7 +682,7 @@ private void doHeaders(Semaphore requestSemaphore) { path, http2Config.validatePath()); stream.prologue(httpPrologue); - stream.requestSemaphore(requestSemaphore); + stream.requestLimit(limit); stream.headers(headers, endOfStream); state = State.READ_FRAME; diff --git a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java index bee1f707da6..edbdd9bfcd7 100644 --- a/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java +++ b/webserver/http2/src/main/java/io/helidon/webserver/http2/Http2ServerStream.java @@ -23,6 +23,9 @@ import java.util.concurrent.Semaphore; import io.helidon.common.buffers.BufferData; +import io.helidon.common.concurrency.limits.BasicLimit; +import io.helidon.common.concurrency.limits.Limit; +import io.helidon.common.concurrency.limits.LimitException; import io.helidon.common.socket.SocketWriterException; import io.helidon.http.DirectHandler; import io.helidon.http.Header; @@ -99,10 +102,9 @@ class Http2ServerStream implements Runnable, Http2Stream { private Http2SubProtocolSelector.SubProtocolHandler subProtocolHandler; private long expectedLength = -1; private HttpPrologue prologue; - // create a semaphore if accessed before we get the one from connection + // create a limit if accessed before we get the one from connection // must be volatile, as it is accessed both from connection thread and from stream thread - private volatile Semaphore requestSemaphore = new Semaphore(1); - private boolean semaphoreAcquired; + private volatile Limit requestLimit = BasicLimit.create(new Semaphore(1)); /** * A new HTTP/2 server stream. @@ -324,9 +326,6 @@ public void run() { } finally { headers = null; subProtocolHandler = null; - if (semaphoreAcquired) { - requestSemaphore.release(); - } } } @@ -425,8 +424,8 @@ void write100Continue() { } } - void requestSemaphore(Semaphore requestSemaphore) { - this.requestSemaphore = requestSemaphore; + void requestLimit(Limit limit) { + this.requestLimit = limit; } void prologue(HttpPrologue prologue) { @@ -516,15 +515,17 @@ private void handle() { streamId, this::readEntityFromPipeline); Http2ServerResponse response = new Http2ServerResponse(this, request); - semaphoreAcquired = requestSemaphore.tryAcquire(); + try { - if (semaphoreAcquired) { - routing.route(ctx, request, response); - } else { + try { + requestLimit.invoke(() -> routing.route(ctx, request, response)); + } catch (LimitException e) { ctx.log(LOGGER, TRACE, "Too many concurrent requests, rejecting request."); response.status(Status.SERVICE_UNAVAILABLE_503) .send("Too Many Concurrent Requests"); response.commit(); + } catch (Exception e) { + throw new CloseConnectionException("Failed to handle request", e); } } finally { request.content().consume(); diff --git a/webserver/http2/src/main/java/module-info.java b/webserver/http2/src/main/java/module-info.java index 73913df2958..108309d0231 100644 --- a/webserver/http2/src/main/java/module-info.java +++ b/webserver/http2/src/main/java/module-info.java @@ -40,6 +40,7 @@ requires transitive io.helidon.http.media; requires transitive io.helidon.http; requires transitive io.helidon.webserver; + requires transitive io.helidon.common.concurrency.limits; exports io.helidon.webserver.http2; exports io.helidon.webserver.http2.spi; diff --git a/webserver/pom.xml b/webserver/pom.xml index da19bb97dad..e7947386427 100644 --- a/webserver/pom.xml +++ b/webserver/pom.xml @@ -46,6 +46,7 @@ testing webserver websocket + concurrency-limits diff --git a/webserver/webserver/pom.xml b/webserver/webserver/pom.xml index b41d29d7e42..77bb9ada598 100644 --- a/webserver/webserver/pom.xml +++ b/webserver/webserver/pom.xml @@ -80,6 +80,10 @@ io.helidon.common.features helidon-common-features + + io.helidon.common.concurrency + helidon-common-concurrency-limits +