Skip to content

Commit

Permalink
Merge pull request #231 from yavor300/add-reactive-resilience4j-bulkh…
Browse files Browse the repository at this point in the history
…ead-provider

Add support for reactive Resilience4jBulkheadProvider to provide bulkhead support for reactive operations (Mono and Flux).
  • Loading branch information
ryanjbaxter authored Jan 23, 2025
2 parents 5c0c8e7 + 079f231 commit abddcd8
Show file tree
Hide file tree
Showing 12 changed files with 942 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,41 @@ public Customizer<Resilience4jBulkheadProvider> defaultBulkheadCustomizer() {
}
----

== Reactive Bulkhead Pattern Supporting

If you are using reactive programming with Spring Cloud CircuitBreaker, you can leverage the `ReactiveResilience4jBulkheadProvider` to support the Bulkhead pattern in reactive pipelines.
This provider decorates `Mono` and `Flux` instances to ensure bulkhead constraints are applied during reactive operations.

Spring Cloud CircuitBreaker Resilience4j reactive support only uses the `SemaphoreBulkhead`.
If the property `spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead` is set to `false`, a warning will be logged, and the `ReactiveResilience4jBulkheadProvider` will still use the `SemaphoreBulkhead`.

== Configuring Reactive Bulkhead

The `ReactiveResilience4jBulkheadProvider` can be customized using a `Customizer` bean, as shown below:

[source,java]
----
@Bean
public Customizer<ReactiveResilience4jBulkheadProvider> reactiveBulkheadCustomizer() {
return provider -> provider.configureDefault(id -> new Resilience4jBulkheadConfigurationBuilder()
.bulkheadConfig(BulkheadConfig.custom().maxConcurrentCalls(4).build())
.build());
}
----

You can also add individual bulkhead configurations for specific use cases:

[source,java]
----
@Bean
public Customizer<ReactiveResilience4jBulkheadProvider> reactiveSpecificBulkheadCustomizer() {
return provider -> provider.configure(builder -> {
builder.bulkheadConfig(BulkheadConfig.custom()
.maxConcurrentCalls(2)
.build());
}, "serviceBulkhead");
}
----

For more details, see the https://resilience4j.readme.io/docs/examples-1#decorate-mono-or-flux-with-a-bulkhead[Resilience4j Reactive Bulkhead Examples].

Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@
import java.util.ArrayList;
import java.util.List;

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.micrometer.tagged.TaggedBulkheadMetrics;
import io.github.resilience4j.micrometer.tagged.TaggedCircuitBreakerMetrics;
import io.github.resilience4j.micrometer.tagged.TaggedCircuitBreakerMetricsPublisher;
import io.github.resilience4j.timelimiter.TimeLimiterRegistry;
import io.micrometer.core.instrument.MeterRegistry;
import jakarta.annotation.PostConstruct;
import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand All @@ -41,6 +46,7 @@
* @author Ryan Baxter
* @author Eric Bussieres
* @author Thomas Vitale
* @author Yavor Chamov
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(name = { "reactor.core.publisher.Mono", "reactor.core.publisher.Flux",
Expand All @@ -57,24 +63,55 @@ public class ReactiveResilience4JAutoConfiguration {
@ConditionalOnMissingBean(ReactiveCircuitBreakerFactory.class)
public ReactiveResilience4JCircuitBreakerFactory reactiveResilience4JCircuitBreakerFactory(
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry,
@Autowired(required = false) ReactiveResilience4jBulkheadProvider bulkheadProvider,
Resilience4JConfigurationProperties resilience4JConfigurationProperties) {
ReactiveResilience4JCircuitBreakerFactory factory = new ReactiveResilience4JCircuitBreakerFactory(
circuitBreakerRegistry, timeLimiterRegistry, resilience4JConfigurationProperties);
circuitBreakerRegistry, timeLimiterRegistry, bulkheadProvider, resilience4JConfigurationProperties);
customizers.forEach(customizer -> customizer.customize(factory));
return factory;
}

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(name = { "reactor.core.publisher.Mono", "reactor.core.publisher.Flux",
@ConditionalOnClass(Bulkhead.class)
@ConditionalOnProperty(value = "spring.cloud.circuitbreaker.bulkhead.resilience4j.enabled", matchIfMissing = true)
public static class Resilience4jBulkheadConfiguration {

@Autowired(required = false)
private List<Customizer<ReactiveResilience4jBulkheadProvider>> bulkheadCustomizers = new ArrayList<>();

@Value("${spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead:true}")
private boolean enableSemaphoreDefaultBulkhead;

@Bean
public ReactiveResilience4jBulkheadProvider reactiveBulkheadProvider(BulkheadRegistry bulkheadRegistry) {

if (!enableSemaphoreDefaultBulkhead) {
LoggerFactory.getLogger(Resilience4jBulkheadConfiguration.class)
.warn("Ignoring 'spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead=false'. " +
"ReactiveResilience4jBulkheadProvider only supports SemaphoreBulkhead.");
}

ReactiveResilience4jBulkheadProvider reactiveResilience4JCircuitBreaker =
new ReactiveResilience4jBulkheadProvider(bulkheadRegistry);
bulkheadCustomizers.forEach(customizer -> customizer.customize(reactiveResilience4JCircuitBreaker));
return reactiveResilience4JCircuitBreaker;
}
}

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(name = {"reactor.core.publisher.Mono", "reactor.core.publisher.Flux",
"io.github.resilience4j.micrometer.tagged.TaggedCircuitBreakerMetrics",
"io.github.resilience4j.micrometer.tagged.TaggedCircuitBreakerMetricsPublisher" })
@ConditionalOnBean({ MeterRegistry.class })
@ConditionalOnMissingBean({ TaggedCircuitBreakerMetricsPublisher.class })
"io.github.resilience4j.micrometer.tagged.TaggedCircuitBreakerMetricsPublisher"})
@ConditionalOnBean({MeterRegistry.class})
@ConditionalOnMissingBean({TaggedCircuitBreakerMetricsPublisher.class})
public static class MicrometerReactiveResilience4JCustomizerConfiguration {

@Autowired(required = false)
private ReactiveResilience4JCircuitBreakerFactory factory;

@Autowired(required = false)
private ReactiveResilience4jBulkheadProvider bulkheadProvider;

@Autowired(required = false)
private TaggedCircuitBreakerMetrics taggedCircuitBreakerMetrics;

Expand All @@ -90,6 +127,9 @@ public void init() {
}
taggedCircuitBreakerMetrics.bindTo(meterRegistry);
}
if (bulkheadProvider != null) {
TaggedBulkheadMetrics.ofBulkheadRegistry(bulkheadProvider.getBulkheadRegistry()).bindTo(meterRegistry);
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@
* @author Ryan Baxter
* @author Thomas Vitale
* @author 荒
* @author Yavor Chamov
*/
public class ReactiveResilience4JCircuitBreaker implements ReactiveCircuitBreaker {

private final String id;

private final String groupName;

private final ReactiveResilience4jBulkheadProvider bulkheadProvider;

private final io.github.resilience4j.circuitbreaker.CircuitBreakerConfig circuitBreakerConfig;

private final CircuitBreakerRegistry circuitBreakerRegistry;
Expand All @@ -70,24 +73,42 @@ public ReactiveResilience4JCircuitBreaker(String id, String groupName,
this(id, groupName, config, circuitBreakerRegistry, timeLimiterRegistry, circuitBreakerCustomizer, false);
}

@Deprecated
public ReactiveResilience4JCircuitBreaker(String id, String groupName,
Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration config,
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry,
Optional<Customizer<CircuitBreaker>> circuitBreakerCustomizer, boolean disableTimeLimiter) {
this(id, groupName, config, circuitBreakerRegistry, timeLimiterRegistry, circuitBreakerCustomizer, null, disableTimeLimiter);
}

public ReactiveResilience4JCircuitBreaker(String id, String groupName,
Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration config,
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry,
Optional<Customizer<CircuitBreaker>> circuitBreakerCustomizer,
ReactiveResilience4jBulkheadProvider bulkheadProvider, boolean disableTimeLimiter) {
this.id = id;
this.groupName = groupName;
this.circuitBreakerConfig = config.getCircuitBreakerConfig();
this.circuitBreakerRegistry = circuitBreakerRegistry;
this.circuitBreakerCustomizer = circuitBreakerCustomizer;
this.timeLimiterConfig = config.getTimeLimiterConfig();
this.timeLimiterRegistry = timeLimiterRegistry;
this.bulkheadProvider = bulkheadProvider;
this.disableTimeLimiter = disableTimeLimiter;
}

@Override
public <T> Mono<T> run(Mono<T> toRun, Function<Throwable, Mono<T>> fallback) {
final Map<String, String> tags = Map.of(CIRCUIT_BREAKER_GROUP_TAG, this.groupName);
Tuple2<CircuitBreaker, Optional<TimeLimiter>> tuple = buildCircuitBreakerAndTimeLimiter();
Mono<T> toReturn = toRun.transform(CircuitBreakerOperator.of(tuple.getT1()));
Mono<T> toReturn;
if (bulkheadProvider != null) {
toReturn = bulkheadProvider.decorateMono(groupName, tags, toRun);
}
else {
toReturn = toRun;
}
toReturn = toReturn.transform(CircuitBreakerOperator.of(tuple.getT1()));
if (tuple.getT2().isPresent()) {
final Duration timeoutDuration = tuple.getT2().get().getTimeLimiterConfig().getTimeoutDuration();
toReturn = toReturn.timeout(timeoutDuration)
Expand All @@ -105,8 +126,16 @@ public <T> Mono<T> run(Mono<T> toRun, Function<Throwable, Mono<T>> fallback) {

@Override
public <T> Flux<T> run(Flux<T> toRun, Function<Throwable, Flux<T>> fallback) {
final Map<String, String> tags = Map.of(CIRCUIT_BREAKER_GROUP_TAG, this.groupName);
Tuple2<CircuitBreaker, Optional<TimeLimiter>> tuple = buildCircuitBreakerAndTimeLimiter();
Flux<T> toReturn = toRun.transform(CircuitBreakerOperator.of(tuple.getT1()));
Flux<T> toReturn;
if (bulkheadProvider != null) {
toReturn = bulkheadProvider.decorateFlux(groupName, tags, toRun);
}
else {
toReturn = toRun;
}
toReturn = toReturn.transform(CircuitBreakerOperator.of(tuple.getT1()));
if (tuple.getT2().isPresent()) {
final Duration timeoutDuration = tuple.getT2().get().getTimeLimiterConfig().getTimeoutDuration();
toReturn = toReturn.timeout(timeoutDuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@
* @author Ryan Baxter
* @author Thomas Vitale
* @author 荒
* @author Yavor Chamov
*/
public class ReactiveResilience4JCircuitBreakerFactory extends
ReactiveCircuitBreakerFactory<Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration, Resilience4JConfigBuilder> {

private final ReactiveResilience4jBulkheadProvider bulkheadProvider;

private Function<String, Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration> defaultConfiguration;

private CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();
Expand All @@ -53,13 +56,21 @@ public class ReactiveResilience4JCircuitBreakerFactory extends
@Deprecated
public ReactiveResilience4JCircuitBreakerFactory(CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry) {
this(circuitBreakerRegistry, timeLimiterRegistry, null);
this(circuitBreakerRegistry, timeLimiterRegistry, null, null);
}

@Deprecated
public ReactiveResilience4JCircuitBreakerFactory(CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry,
Resilience4JConfigurationProperties resilience4JConfigurationProperties) {
this(circuitBreakerRegistry, timeLimiterRegistry, null, resilience4JConfigurationProperties);
}

public ReactiveResilience4JCircuitBreakerFactory(CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry, ReactiveResilience4jBulkheadProvider bulkheadProvider,
Resilience4JConfigurationProperties resilience4JConfigurationProperties) {
this.circuitBreakerRegistry = circuitBreakerRegistry;
this.bulkheadProvider = bulkheadProvider;
this.timeLimiterRegistry = timeLimiterRegistry;
this.defaultConfiguration = id -> new Resilience4JConfigBuilder(id)
.circuitBreakerConfig(this.circuitBreakerRegistry.getDefaultConfig())
Expand Down Expand Up @@ -106,7 +117,8 @@ public ReactiveCircuitBreaker create(String id, String groupName) {
boolean isDisableTimeLimiter = ConfigurationPropertiesUtils
.isDisableTimeLimiter(this.resilience4JConfigurationProperties, id, groupName);
return new ReactiveResilience4JCircuitBreaker(id, groupName, config, circuitBreakerRegistry,
timeLimiterRegistry, Optional.ofNullable(circuitBreakerCustomizers.get(id)), isDisableTimeLimiter);
timeLimiterRegistry, Optional.ofNullable(circuitBreakerCustomizers.get(id)),
bulkheadProvider, isDisableTimeLimiter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.circuitbreaker.resilience4j;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.bulkhead.BulkheadRegistry;
import io.github.resilience4j.reactor.bulkhead.operator.BulkheadOperator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.lang.NonNull;
import org.springframework.util.Assert;

/**
* @author Yavor Chamov
*/
public class ReactiveResilience4jBulkheadProvider {

private final BulkheadRegistry bulkheadRegistry;

private final ConcurrentHashMap<String, Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration>
configurations = new ConcurrentHashMap<>();

private Function<String, Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration> defaultConfiguration;

public ReactiveResilience4jBulkheadProvider(BulkheadRegistry bulkheadRegistry) {
this.bulkheadRegistry = bulkheadRegistry;
this.defaultConfiguration = id -> new Resilience4jBulkheadConfigurationBuilder()
.bulkheadConfig(this.bulkheadRegistry.getDefaultConfig())
.build();
}

public void configureDefault(
@NonNull Function<String, Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration> defaultConfiguration) {
Assert.notNull(defaultConfiguration, "Default configuration must not be null");
this.defaultConfiguration = defaultConfiguration;
}

public void configure(Consumer<Resilience4jBulkheadConfigurationBuilder> consumer, String... ids) {
for (String id : ids) {
Resilience4jBulkheadConfigurationBuilder builder = new Resilience4jBulkheadConfigurationBuilder();
consumer.accept(builder);
Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration configuration = builder.build();
configurations.put(id, configuration);
}
}

public void addBulkheadCustomizer(Consumer<Bulkhead> customizer, String... ids) {
for (String id : ids) {
Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration configuration = configurations
.computeIfAbsent(id, defaultConfiguration);
Bulkhead bulkhead = bulkheadRegistry.bulkhead(id, configuration.getBulkheadConfig());
customizer.accept(bulkhead);
}
}

public BulkheadRegistry getBulkheadRegistry() {
return bulkheadRegistry;
}

public <T> Mono<T> decorateMono(String id, Map<String, String> tags, Mono<T> mono) {
Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration configuration = configurations
.computeIfAbsent(id, this::getConfiguration);
Bulkhead bulkhead = bulkheadRegistry.bulkhead(id, configuration.getBulkheadConfig(), tags);
return mono.transformDeferred(BulkheadOperator.of(bulkhead));
}

public <T> Flux<T> decorateFlux(String id, Map<String, String> tags, Flux<T> flux) {
Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration configuration = configurations
.computeIfAbsent(id, this::getConfiguration);
Bulkhead bulkhead = bulkheadRegistry.bulkhead(id, configuration.getBulkheadConfig(), tags);
return flux.transformDeferred(BulkheadOperator.of(bulkhead));
}

private Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration getConfiguration(String id) {
Resilience4jBulkheadConfigurationBuilder builder = new Resilience4jBulkheadConfigurationBuilder();
Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration defaultConfiguration =
this.defaultConfiguration.apply(id);
Optional<BulkheadConfig> bulkheadConfiguration = bulkheadRegistry.getConfiguration(id);
builder.bulkheadConfig(bulkheadConfiguration.orElse(defaultConfiguration.getBulkheadConfig()));
return builder.build();
}
}
Loading

0 comments on commit abddcd8

Please sign in to comment.