Skip to content

Commit

Permalink
Bumping versions
Browse files Browse the repository at this point in the history
  • Loading branch information
spring-builds committed Jan 23, 2025
1 parent abddcd8 commit 0618d31
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,24 @@ public ReactiveResilience4jBulkheadProvider reactiveBulkheadProvider(BulkheadReg

if (!enableSemaphoreDefaultBulkhead) {
LoggerFactory.getLogger(Resilience4jBulkheadConfiguration.class)
.warn("Ignoring 'spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead=false'. " +
"ReactiveResilience4jBulkheadProvider only supports SemaphoreBulkhead.");
.warn("Ignoring 'spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead=false'. "
+ "ReactiveResilience4jBulkheadProvider only supports SemaphoreBulkhead.");
}

ReactiveResilience4jBulkheadProvider reactiveResilience4JCircuitBreaker =
new ReactiveResilience4jBulkheadProvider(bulkheadRegistry);
ReactiveResilience4jBulkheadProvider reactiveResilience4JCircuitBreaker = new ReactiveResilience4jBulkheadProvider(
bulkheadRegistry);
bulkheadCustomizers.forEach(customizer -> customizer.customize(reactiveResilience4JCircuitBreaker));
return reactiveResilience4JCircuitBreaker;
}

}

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(name = {"reactor.core.publisher.Mono", "reactor.core.publisher.Flux",
@ConditionalOnClass(name = { "reactor.core.publisher.Mono", "reactor.core.publisher.Flux",
"io.github.resilience4j.micrometer.tagged.TaggedCircuitBreakerMetrics",
"io.github.resilience4j.micrometer.tagged.TaggedCircuitBreakerMetricsPublisher"})
@ConditionalOnBean({MeterRegistry.class})
@ConditionalOnMissingBean({TaggedCircuitBreakerMetricsPublisher.class})
"io.github.resilience4j.micrometer.tagged.TaggedCircuitBreakerMetricsPublisher" })
@ConditionalOnBean({ MeterRegistry.class })
@ConditionalOnMissingBean({ TaggedCircuitBreakerMetricsPublisher.class })
public static class MicrometerReactiveResilience4JCustomizerConfiguration {

@Autowired(required = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public ReactiveResilience4JCircuitBreaker(String id, String groupName,
Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration config,
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry,
Optional<Customizer<CircuitBreaker>> circuitBreakerCustomizer, boolean disableTimeLimiter) {
this(id, groupName, config, circuitBreakerRegistry, timeLimiterRegistry, circuitBreakerCustomizer, null, disableTimeLimiter);
this(id, groupName, config, circuitBreakerRegistry, timeLimiterRegistry, circuitBreakerCustomizer, null,
disableTimeLimiter);
}

public ReactiveResilience4JCircuitBreaker(String id, String groupName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ public ReactiveCircuitBreaker create(String id, String groupName) {
boolean isDisableTimeLimiter = ConfigurationPropertiesUtils
.isDisableTimeLimiter(this.resilience4JConfigurationProperties, id, groupName);
return new ReactiveResilience4JCircuitBreaker(id, groupName, config, circuitBreakerRegistry,
timeLimiterRegistry, Optional.ofNullable(circuitBreakerCustomizers.get(id)),
bulkheadProvider, isDisableTimeLimiter);
timeLimiterRegistry, Optional.ofNullable(circuitBreakerCustomizers.get(id)), bulkheadProvider,
isDisableTimeLimiter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,15 @@ public class ReactiveResilience4jBulkheadProvider {

private final BulkheadRegistry bulkheadRegistry;

private final ConcurrentHashMap<String, Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration>
configurations = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration> configurations = new ConcurrentHashMap<>();

private Function<String, Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration> defaultConfiguration;

public ReactiveResilience4jBulkheadProvider(BulkheadRegistry bulkheadRegistry) {
this.bulkheadRegistry = bulkheadRegistry;
this.defaultConfiguration = id -> new Resilience4jBulkheadConfigurationBuilder()
.bulkheadConfig(this.bulkheadRegistry.getDefaultConfig())
.build();
.bulkheadConfig(this.bulkheadRegistry.getDefaultConfig())
.build();
}

public void configureDefault(
Expand All @@ -69,7 +68,7 @@ public void configure(Consumer<Resilience4jBulkheadConfigurationBuilder> consume
public void addBulkheadCustomizer(Consumer<Bulkhead> customizer, String... ids) {
for (String id : ids) {
Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration configuration = configurations
.computeIfAbsent(id, defaultConfiguration);
.computeIfAbsent(id, defaultConfiguration);
Bulkhead bulkhead = bulkheadRegistry.bulkhead(id, configuration.getBulkheadConfig());
customizer.accept(bulkhead);
}
Expand All @@ -81,24 +80,25 @@ public BulkheadRegistry getBulkheadRegistry() {

public <T> Mono<T> decorateMono(String id, Map<String, String> tags, Mono<T> mono) {
Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration configuration = configurations
.computeIfAbsent(id, this::getConfiguration);
.computeIfAbsent(id, this::getConfiguration);
Bulkhead bulkhead = bulkheadRegistry.bulkhead(id, configuration.getBulkheadConfig(), tags);
return mono.transformDeferred(BulkheadOperator.of(bulkhead));
}

public <T> Flux<T> decorateFlux(String id, Map<String, String> tags, Flux<T> flux) {
Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration configuration = configurations
.computeIfAbsent(id, this::getConfiguration);
.computeIfAbsent(id, this::getConfiguration);
Bulkhead bulkhead = bulkheadRegistry.bulkhead(id, configuration.getBulkheadConfig(), tags);
return flux.transformDeferred(BulkheadOperator.of(bulkhead));
}

private Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration getConfiguration(String id) {
Resilience4jBulkheadConfigurationBuilder builder = new Resilience4jBulkheadConfigurationBuilder();
Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration defaultConfiguration =
this.defaultConfiguration.apply(id);
Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration defaultConfiguration = this.defaultConfiguration
.apply(id);
Optional<BulkheadConfig> bulkheadConfiguration = bulkheadRegistry.getConfiguration(id);
builder.bulkheadConfig(bulkheadConfiguration.orElse(defaultConfiguration.getBulkheadConfig()));
return builder.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,34 +112,28 @@ void testInstanceConfigurationOverridesConfigAndCustomizerProperties() {

@Test
void testReactiveInstanceConfigurationOverridesConfigAndCustomizerProperties() {
new WebApplicationContextRunner()
.withUserConfiguration(Application.class)
.withPropertyValues(
"resilience4j.bulkhead.configs.testme.max-concurrent-calls=30",
"resilience4j.bulkhead.instances.testme.max-concurrent-calls=40"
)
new WebApplicationContextRunner().withUserConfiguration(Application.class)
.withPropertyValues("resilience4j.bulkhead.configs.testme.max-concurrent-calls=30",
"resilience4j.bulkhead.instances.testme.max-concurrent-calls=40")
.run(context -> {
final String id = "testme";

ReactiveResilience4JCircuitBreakerFactory resilience4JCircuitBreakerFactory = context
.getBean(ReactiveResilience4JCircuitBreakerFactory.class);

ReactiveResilience4jBulkheadProvider bulkheadProvider = context.getBean(ReactiveResilience4jBulkheadProvider.class);
ReactiveResilience4jBulkheadProvider bulkheadProvider = context
.getBean(ReactiveResilience4jBulkheadProvider.class);

bulkheadProvider.configure(builder -> {
BulkheadConfig bulkheadConfig = BulkheadConfig.custom()
.maxConcurrentCalls(50)
.build();
BulkheadConfig bulkheadConfig = BulkheadConfig.custom().maxConcurrentCalls(50).build();
builder.bulkheadConfig(bulkheadConfig);
}, id);

BulkheadRegistry bulkheadRegistry = context.getBean(BulkheadRegistry.class);

Mono<String> result = resilience4JCircuitBreakerFactory.create(id).run(Mono.just("test"));

StepVerifier.create(result)
.expectNext("test")
.verifyComplete();
StepVerifier.create(result).expectNext("test").verifyComplete();

Optional<Bulkhead> bulkheadOptional = bulkheadRegistry.find(id);
assertThat(bulkheadOptional).isPresent();
Expand Down Expand Up @@ -176,33 +170,27 @@ void testCustomizerConfigurationOverridesConfigProperties() {

@Test
void testReactiveCustomizerConfigurationOverridesConfigProperties() {
new WebApplicationContextRunner()
.withUserConfiguration(Application.class)
.withPropertyValues(
"resilience4j.bulkhead.configs.testme.max-concurrent-calls=30"
)
new WebApplicationContextRunner().withUserConfiguration(Application.class)
.withPropertyValues("resilience4j.bulkhead.configs.testme.max-concurrent-calls=30")
.run(context -> {
final String id = "testme";

ReactiveResilience4JCircuitBreakerFactory resilience4JCircuitBreakerFactory = context
.getBean(ReactiveResilience4JCircuitBreakerFactory.class);

ReactiveResilience4jBulkheadProvider bulkheadProvider = context.getBean(ReactiveResilience4jBulkheadProvider.class);
ReactiveResilience4jBulkheadProvider bulkheadProvider = context
.getBean(ReactiveResilience4jBulkheadProvider.class);

bulkheadProvider.configure(builder -> {
BulkheadConfig bulkheadConfig = BulkheadConfig.custom()
.maxConcurrentCalls(50)
.build();
BulkheadConfig bulkheadConfig = BulkheadConfig.custom().maxConcurrentCalls(50).build();
builder.bulkheadConfig(bulkheadConfig);
}, id);

BulkheadRegistry bulkheadRegistry = context.getBean(BulkheadRegistry.class);

Mono<String> result = resilience4JCircuitBreakerFactory.create(id).run(Mono.just("test"));

StepVerifier.create(result)
.expectNext("test")
.verifyComplete();
StepVerifier.create(result).expectNext("test").verifyComplete();

Optional<Bulkhead> bulkheadOptional = bulkheadRegistry.find(id);
assertThat(bulkheadOptional).isPresent();
Expand Down Expand Up @@ -286,28 +274,31 @@ void configureDefaultOverridesPropertyDefaultForThreadpool() {
void configureDefaultOverridesPropertyDefaultForReactiveBulkhead() {
new WebApplicationContextRunner().withUserConfiguration(Application.class)
.withPropertyValues("resilience4j.bulkhead.config.default.max-concurrent-calls=30",
"resilience4j.threadpool.config.default.queueCapacity=30"/*
* ,
* "resilience4j.bulkhead.instances.testme.max-wait-duration=30s",
* "resilience4j.threadPoolBulkHead.instances.testme.core-threadpool-size=1"
*/)
"resilience4j.threadpool.config.default.queueCapacity=30"/*
* ,
* "resilience4j.bulkhead.instances.testme.max-wait-duration=30s",
* "resilience4j.threadPoolBulkHead.instances.testme.core-threadpool-size=1"
*/)
.run(context -> {
final String id = "testme";
ReactiveResilience4JCircuitBreakerFactory resilience4JCircuitBreakerFactory = context
.getBean(ReactiveResilience4JCircuitBreakerFactory.class);
ReactiveResilience4jBulkheadProvider bulkheadProvider = context.getBean(ReactiveResilience4jBulkheadProvider.class);
ReactiveResilience4jBulkheadProvider bulkheadProvider = context
.getBean(ReactiveResilience4jBulkheadProvider.class);
bulkheadProvider.configureDefault(bulkheadId -> new Resilience4jBulkheadConfigurationBuilder()
.bulkheadConfig(BulkheadConfig.custom().maxConcurrentCalls(50).maxWaitDuration(Duration.ofSeconds(10)).build())
.bulkheadConfig(BulkheadConfig.custom()
.maxConcurrentCalls(50)
.maxWaitDuration(Duration.ofSeconds(10))
.build())
.build());
BulkheadRegistry bulkheadRegistry = context.getBean(BulkheadRegistry.class);
Mono<String> result = resilience4JCircuitBreakerFactory.create(id).run(Mono.just("test"));
StepVerifier.create(result)
.expectNext("test")
.verifyComplete();
StepVerifier.create(result).expectNext("test").verifyComplete();
Optional<Bulkhead> bulkheadOptional = bulkheadRegistry.find(id);
assertThat(bulkheadOptional).isPresent();
assertThat(bulkheadOptional.get().getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(50);
assertThat(bulkheadOptional.get().getBulkheadConfig().getMaxWaitDuration()).isEqualTo(Duration.ofSeconds(10));
assertThat(bulkheadOptional.get().getBulkheadConfig().getMaxWaitDuration())
.isEqualTo(Duration.ofSeconds(10));
});

}
Expand Down Expand Up @@ -342,18 +333,16 @@ void configureDefaultOverridesPropertyDefaultForSemaphore() {

@Test
void configureDefaultOverridesPropertyDefaultForReactiveSemaphore() {
new WebApplicationContextRunner()
.withUserConfiguration(Application.class)
.withPropertyValues(
"resilience4j.bulkhead.config.default.max-concurrent-calls=30",
"spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead=true"
)
new WebApplicationContextRunner().withUserConfiguration(Application.class)
.withPropertyValues("resilience4j.bulkhead.config.default.max-concurrent-calls=30",
"spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead=true")
.run(context -> {
final String id = "testme";

ReactiveResilience4JCircuitBreakerFactory resilience4JCircuitBreakerFactory = context
.getBean(ReactiveResilience4JCircuitBreakerFactory.class);
ReactiveResilience4jBulkheadProvider bulkheadProvider = context.getBean(ReactiveResilience4jBulkheadProvider.class);
ReactiveResilience4jBulkheadProvider bulkheadProvider = context
.getBean(ReactiveResilience4jBulkheadProvider.class);

bulkheadProvider.configureDefault(bulkheadId -> new Resilience4jBulkheadConfigurationBuilder()
.bulkheadConfig(BulkheadConfig.custom().maxConcurrentCalls(40).build())
Expand All @@ -363,9 +352,7 @@ void configureDefaultOverridesPropertyDefaultForReactiveSemaphore() {

Mono<String> result = resilience4JCircuitBreakerFactory.create(id).run(Mono.just("test"));

StepVerifier.create(result)
.expectNext("test")
.verifyComplete();
StepVerifier.create(result).expectNext("test").verifyComplete();

Optional<Bulkhead> semaphoreBulkhead = bulkheadRegistry.find(id);
assertThat(semaphoreBulkhead).isPresent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public class ReactiveResilience4JAutoConfigurationWithoutBulkheadTest {
public void testWithoutBulkhead() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder().web(WebApplicationType.NONE)
.sources(TestApp.class)
.run()
) {
.run()) {
assertThat(context.containsBean("reactiveBulkheadProvider")).isFalse();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
public class ReactiveResilience4JAutoConfigurationWithoutMetricsTest {

static ReactiveResilience4JCircuitBreakerFactory circuitBreakerFactory = spy(
new ReactiveResilience4JCircuitBreakerFactory(CircuitBreakerRegistry.ofDefaults(),
TimeLimiterRegistry.ofDefaults(), new Resilience4JConfigurationProperties()));
new ReactiveResilience4JCircuitBreakerFactory(CircuitBreakerRegistry.ofDefaults(),
TimeLimiterRegistry.ofDefaults(), new Resilience4JConfigurationProperties()));

@Test
public void testWithoutMetrics() {
Expand All @@ -61,8 +61,7 @@ public void testWithoutMetrics() {

@Test
public void testProviderCreatedWhenEnableSemaphoreDefaultBulkheadFalse() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder()
.web(WebApplicationType.NONE)
try (ConfigurableApplicationContext context = new SpringApplicationBuilder().web(WebApplicationType.NONE)
.sources(TestApp.class)
.properties("spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead=false")
.run()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,14 @@ public Customizer<ReactiveResilience4JCircuitBreakerFactory> reactiveSlowBulkhea

@Bean
public Customizer<ReactiveResilience4jBulkheadProvider> reactiveBulkheadProviderCustomizer() {
return provider -> provider.addBulkheadCustomizer(bulkhead -> { }, SLOW_BULKHEAD);
return provider -> provider.addBulkheadCustomizer(bulkhead -> {
}, SLOW_BULKHEAD);
}

enum CompletionStatus {

SUCCESS, INTERRUPTED

}

@Service
Expand All @@ -100,17 +103,17 @@ public static class DemoReactiveService {
}

public Mono<CompletionStatus> bulkheadWithDelay(long delay) {
return slowBulkheadCircuitBreaker.run(
Mono.just(CompletionStatus.SUCCESS)
.delayElement(Duration.ofMillis(delay)),
throwable -> {
return slowBulkheadCircuitBreaker
.run(Mono.just(CompletionStatus.SUCCESS).delayElement(Duration.ofMillis(delay)), throwable -> {
if (throwable instanceof TimeoutException || throwable instanceof BulkheadFullException) {
return Mono.just(CompletionStatus.INTERRUPTED);
}
return Mono.error(throwable);
}
);
});
}

}

}

}
Loading

0 comments on commit 0618d31

Please sign in to comment.