diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/connectors/ExecutionHolder.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/connectors/ExecutionHolder.java index e9c22e016e..e340de468b 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/connectors/ExecutionHolder.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/connectors/ExecutionHolder.java @@ -7,9 +7,14 @@ import jakarta.enterprise.context.BeforeDestroyed; import jakarta.enterprise.event.Observes; import jakarta.enterprise.event.Reception; +import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.Default; import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; +import org.eclipse.microprofile.config.Config; + +import io.smallrye.common.annotation.Identifier; import io.vertx.mutiny.core.Vertx; /** @@ -19,6 +24,8 @@ @ApplicationScoped public class ExecutionHolder { + private static final String REACTIVE_MESSAGING_VERTX_CDI_QUALIFIER = "mp.messaging.connector.vertx.cdi.identifier"; + private boolean internalVertxInstance = false; final Vertx vertx; @@ -40,17 +47,34 @@ public ExecutionHolder(Vertx vertx) { } @Inject - public ExecutionHolder(Instance instanceOfVertx) { - if (instanceOfVertx == null || instanceOfVertx.isUnsatisfied()) { + public ExecutionHolder(@Any Instance instanceOfVertx, Instance config) { + String cdiQualifier = null; + if (config != null && !config.isUnsatisfied()) { + final Config theConfig = config.get(); + cdiQualifier = theConfig.getConfigValue(REACTIVE_MESSAGING_VERTX_CDI_QUALIFIER).getValue(); + } + final Instance vertxInstance; + if (cdiQualifier != null && !cdiQualifier.isEmpty()) { + log.vertxFromCDIQualifier(cdiQualifier); + vertxInstance = instanceOfVertx.select(Identifier.Literal.of(cdiQualifier)); + } else { + vertxInstance = instanceOfVertx.select(Default.Literal.INSTANCE); + } + if (vertxInstance == null || vertxInstance.isUnsatisfied()) { internalVertxInstance = true; this.vertx = Vertx.vertx(); log.vertXInstanceCreated(); } else { - this.vertx = instanceOfVertx.get(); + this.vertx = vertxInstance.get(); } } public Vertx vertx() { return vertx; } + + // this is used in the test + boolean isInternalVertxInstance() { + return internalVertxInstance; + } } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/i18n/ProviderLogging.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/i18n/ProviderLogging.java index 50e65195cb..63b6a2b706 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/i18n/ProviderLogging.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/i18n/ProviderLogging.java @@ -147,4 +147,8 @@ public interface ProviderLogging extends BasicLogger { @LogMessage(level = Logger.Level.WARN) @Message(id = 243, value = "Processing method '%s' annotated with @Acknowledgement(POST_PROCESSING), but may not be compatible with post-processing acknowledgement management. You may experience duplicate (negative-)acknowledgement of messages.") void postProcessingNotFullySupported(String methodAsString); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 244, value = "Trying to get Vertx instance using Identifier qualifier: %s.") + void vertxFromCDIQualifier(String cdiQualifier); } diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java index dbe7b0a547..9a20277fb7 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java @@ -4,10 +4,12 @@ import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.io.UncheckedIOException; import java.nio.file.Files; import java.util.Collections; import java.util.List; +import java.util.Properties; import jakarta.enterprise.inject.se.SeContainer; import jakarta.enterprise.inject.se.SeContainerInitializer; @@ -98,6 +100,25 @@ public static void installConfig(String path) { } } + @SuppressWarnings("ResultOfMethodCallIgnored") + public static void installConfigFromProperties(Properties properties) { + releaseConfig(); + File out = new File("target/test-classes/META-INF/microprofile-config.properties"); + if (out.isFile()) { + out.delete(); + } + out.getParentFile().mkdirs(); + try (OutputStream outputStream = Files.newOutputStream(out.toPath())) { + properties.store(outputStream, null); + System.out.println("Installed configuration:"); + List list = Files.readAllLines(out.toPath()); + list.forEach(System.out::println); + System.out.println("---------"); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + @BeforeEach public void setUp() { releaseConfig(); diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/connectors/VertxInstanceSelectTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/connectors/VertxInstanceSelectTest.java new file mode 100644 index 0000000000..1879b12164 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/connectors/VertxInstanceSelectTest.java @@ -0,0 +1,112 @@ +package io.smallrye.reactive.messaging.providers.connectors; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; + +import jakarta.enterprise.event.Observes; +import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.Default; +import jakarta.enterprise.inject.spi.AfterBeanDiscovery; +import jakarta.enterprise.inject.spi.BeanManager; +import jakarta.enterprise.inject.spi.Extension; + +import org.junit.jupiter.api.Test; + +import io.smallrye.common.annotation.Identifier; +import io.smallrye.reactive.messaging.WeldTestBaseWithoutTails; +import io.vertx.mutiny.core.Vertx; + +public class VertxInstanceSelectTest extends WeldTestBaseWithoutTails { + + private static final Properties CONNECTOR_VERTX_CDI_IDENTIFIER_PROPERTIES = new Properties(); + static { + CONNECTOR_VERTX_CDI_IDENTIFIER_PROPERTIES.setProperty("mp.messaging.connector.vertx.cdi.identifier", "vertx"); + } + + private static final Vertx vertxForCDI = Vertx.vertx(); + + // Test the case that 'mp.messaging.connector.vertx.cdi.identifier' is NOT specified and NO Vertx Bean exposed + // from CDI container. The vertx instance should be internal instance. + @Test + public void testNoVertxBeanNoIdentifier() { + initialize(); + ExecutionHolder executionHolder = get(ExecutionHolder.class); + assertThat(executionHolder).isNotNull(); + assertThat(executionHolder.vertx()).isNotNull(); + assertThat(executionHolder.vertx()).isNotEqualTo(vertxForCDI); + assertThat(executionHolder.isInternalVertxInstance()).isTrue(); + } + + // Test the case that 'mp.messaging.connector.vertx.cdi.identifier' IS specified and NO Vertx Bean exposed + // from CDI container. The vertx instance should be internal instance. + @Test + public void testNoVertxBeanWithIdentifier() { + installConfigFromProperties(CONNECTOR_VERTX_CDI_IDENTIFIER_PROPERTIES); + initialize(); + ExecutionHolder executionHolder = get(ExecutionHolder.class); + assertThat(executionHolder).isNotNull(); + assertThat(executionHolder.vertx()).isNotNull(); + assertThat(executionHolder.vertx()).isNotEqualTo(vertxForCDI); + assertThat(executionHolder.isInternalVertxInstance()).isTrue(); + } + + // Test the case that 'mp.messaging.connector.vertx.cdi.identifier' is NOT specified but has Vertx Bean exposed + // from CDI container using Default qualifier. The vertx instance should be vertxForCDI. + @Test + public void testWithVertxBeanDefaultNoIdentifier() { + addExtensionClass(VertxCDIDefaultExtension.class); + initialize(); + ExecutionHolder executionHolder = get(ExecutionHolder.class); + assertThat(executionHolder).isNotNull(); + assertThat(executionHolder.vertx()).isNotNull(); + assertThat(executionHolder.vertx()).isEqualTo(vertxForCDI); + assertThat(executionHolder.isInternalVertxInstance()).isFalse(); + } + + // Test the case that 'mp.messaging.connector.vertx.cdi.identifier' IS specified but has Vertx Bean exposed + // from CDI container using Default qualifier. The vertx instance should be internal instance. + @Test + public void testWithVertxBeanDefaultWithIdentifier() { + installConfigFromProperties(CONNECTOR_VERTX_CDI_IDENTIFIER_PROPERTIES); + addExtensionClass(VertxCDIDefaultExtension.class); + initialize(); + ExecutionHolder executionHolder = get(ExecutionHolder.class); + assertThat(executionHolder).isNotNull(); + assertThat(executionHolder.vertx()).isNotNull(); + assertThat(executionHolder.vertx()).isNotEqualTo(vertxForCDI); + assertThat(executionHolder.isInternalVertxInstance()).isTrue(); + } + + // Test the case that 'mp.messaging.connector.vertx.cdi.identifier' is specified and has Vertx Bean exposed + // from CDI container using the Identifier qualifier. The vertx instance should be vertxForCDI. + @Test + public void testWithVertxBeanIdentifierWithIdentifier() { + installConfigFromProperties(CONNECTOR_VERTX_CDI_IDENTIFIER_PROPERTIES); + addExtensionClass(VertxCDIIdentifierExtension.class); + initialize(); + ExecutionHolder executionHolder = get(ExecutionHolder.class); + assertThat(executionHolder).isNotNull(); + assertThat(executionHolder.vertx()).isNotNull(); + assertThat(executionHolder.vertx()).isEqualTo(vertxForCDI); + assertThat(executionHolder.isInternalVertxInstance()).isFalse(); + } + + public static class VertxCDIDefaultExtension implements Extension { + public void registerVertxBean(@Observes AfterBeanDiscovery abd, BeanManager beanManager) { + abd.addBean() + .beanClass(Vertx.class).types(Vertx.class) + .addQualifiers(Any.Literal.INSTANCE, Default.Literal.INSTANCE) + .createWith(cc -> vertxForCDI); + } + } + + public static class VertxCDIIdentifierExtension implements Extension { + public void registerVertxBean(@Observes AfterBeanDiscovery abd, BeanManager beanManager) { + abd.addBean() + .beanClass(Vertx.class).types(Vertx.class) + .addQualifiers(Any.Literal.INSTANCE, Identifier.Literal.of("vertx")) + .createWith(cc -> vertxForCDI); + } + } +}