Skip to content

Commit

Permalink
GH-869 - Polishing.
Browse files Browse the repository at this point in the history
Tweak header population to use the raw routing target. Expand test case to make sure it's retained as message header.
  • Loading branch information
odrotbohm committed Oct 13, 2024
1 parent 41aad24 commit a8835b2
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 24 deletions.
5 changes: 5 additions & 0 deletions spring-modulith-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
<artifactId>spring-modulith-events-kafka</artifactId>
<version>1.3.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-messaging</artifactId>
<version>1.3.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-mongodb</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion spring-modulith-events/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
<module>spring-modulith-events-jms</module>
<module>spring-modulith-events-jpa</module>
<module>spring-modulith-events-kafka</module>
<module>spring-modulith-events-messaging</module>
<module>spring-modulith-events-mongodb</module>
<module>spring-modulith-events-neo4j</module>
<module>spring-modulith-events-messaging</module>
</modules>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ RoutingTarget verify() {
*/
@Override
public String toString() {
return target + "::" + (key == null ? "" : key);
return target + (key == null ? "" : "::" + key);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,28 @@
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.modulith.events.EventExternalizationConfiguration;
import org.springframework.modulith.events.config.EventExternalizationAutoConfiguration;
import org.springframework.modulith.events.support.BrokerRouting;
import org.springframework.modulith.events.support.DelegatingEventExternalizer;

/**
* Auto-configuration to set up a {@link DelegatingEventExternalizer} to externalize events to a Spring Messaging
* {@link MessageChannel message channel}.
* {@link MessageChannel}.
*
* @author Josh Long
* @author Oliver Drotbohm
*/
@AutoConfiguration
@AutoConfigureAfter(EventExternalizationAutoConfiguration.class)
@ConditionalOnClass(MessageChannel.class)
@ConditionalOnProperty(name = "spring.modulith.events.externalization.enabled",
havingValue = "true",
@ConditionalOnProperty(name = "spring.modulith.events.externalization.enabled", havingValue = "true",
matchIfMissing = true)
class SpringMessagingEventExternalizerConfiguration {

private static final Logger logger = LoggerFactory.getLogger(SpringMessagingEventExternalizerConfiguration.class);

public static final String MODULITH_ROUTING_HEADER = "modulithRouting";
public static final String MODULITH_ROUTING_HEADER = "springModulith_routingTarget";

@Bean
DelegatingEventExternalizer springMessagingEventExternalizer(
EventExternalizationConfiguration configuration,
DelegatingEventExternalizer springMessagingEventExternalizer(EventExternalizationConfiguration configuration,
BeanFactory factory) {

logger.debug("Registering domain event externalization for Spring Messaging…");
Expand All @@ -63,16 +61,19 @@ DelegatingEventExternalizer springMessagingEventExternalizer(
context.setBeanResolver(new BeanFactoryResolver(factory));

return new DelegatingEventExternalizer(configuration, (target, payload) -> {
var routing = BrokerRouting.of(target, context);

var targetChannel = target.getTarget();
var message = MessageBuilder
.withPayload(payload)
.setHeader(MODULITH_ROUTING_HEADER, routing)
.setHeader(MODULITH_ROUTING_HEADER, target.toString())
.build();

if (logger.isDebugEnabled()) {
logger.info("trying to find a {} with name {}", MessageChannel.class.getName(), routing.getTarget());
logger.debug("trying to find a {} with name {}", MessageChannel.class.getName(), targetChannel);
}
var bean = factory.getBean(routing.getTarget(), MessageChannel.class);
bean.send(message);

factory.getBean(targetChannel, MessageChannel.class).send(message);

return CompletableFuture.completedFuture(null);
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2024 the original author or authors.
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,7 +29,6 @@
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.core.GenericHandler;
import org.springframework.integration.dsl.DirectChannelSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
Expand All @@ -42,12 +41,14 @@
* Integration tests for Spring Messaging-based event publication.
*
* @author Josh Long
* @author Oliver Drotbohm
* @since 1.3
*/
@SpringBootTest
class SpringMessagingEventPublicationIntegrationTests {

private static final String TARGET = "target::#{someExpression}";
private static final String CHANNEL_NAME = "target";

private static final AtomicInteger COUNTER = new AtomicInteger();

@Autowired TestPublisher publisher;
Expand All @@ -62,12 +63,15 @@ TestPublisher testPublisher(ApplicationEventPublisher publisher) {
}

@Bean
IntegrationFlow inboundIntegrationFlow(
@Qualifier(CHANNEL_NAME) MessageChannel inbound) {
IntegrationFlow inboundIntegrationFlow(@Qualifier(CHANNEL_NAME) MessageChannel inbound) {

return IntegrationFlow
.from(inbound)
.handle((GenericHandler<TestEvent>) (payload, headers) -> {
.handle((__, headers) -> {

assertThat(headers.get(SpringMessagingEventExternalizerConfiguration.MODULITH_ROUTING_HEADER))
.isEqualTo(TARGET);

COUNTER.incrementAndGet();
return null;
})
Expand All @@ -83,16 +87,20 @@ DirectChannelSpec target() {

@Test
void publishesEventToSpringMessaging() throws Exception {

var publishes = 2;

for (var i = 0; i < publishes; i++) {
publisher.publishEvent();
}

Thread.sleep(200);

assertThat(COUNTER.get()).isEqualTo(publishes);
assertThat(completed.findAll()).hasSize(publishes);
}

@Externalized(CHANNEL_NAME)
@Externalized(TARGET)
static class TestEvent {}

@RequiredArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
spring.modulith.events.jdbc.schema-initialization.enabled=true
spring.main.banner-mode=OFF
1 change: 1 addition & 0 deletions src/docs/antora/modules/ROOT/pages/appendix.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ a|* `spring-modulith-docs`
|`spring-modulith-events-jms`|`runtime`|Event externalization support for JMS.
|`spring-modulith-events-jpa`|`runtime`|A JPA-based implementation of the `EventPublicationRegistry`.
|`spring-modulith-events-kafka`|`runtime`|Event externalization support for Kafka.
|`spring-modulith-events-messaging`|`runtime`|Event externalization support into Spring Messaging ``MessageChannel``s.
|`spring-modulith-events-mongodb`|`runtime`|A MongoDB-based implementation of the `EventPublicationRegistry`.
|`spring-modulith-events-neo4j`|`runtime`|A Neo4j-based implementation of the `EventPublicationRegistry`.
|`spring-modulith-junit`|`test`|Test execution optimizations based on the application module structure. Find more details xref:testing.adoc#change-aware-test-execution[here].
Expand Down
11 changes: 8 additions & 3 deletions src/docs/antora/modules/ROOT/pages/events.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ To keep application modules as decoupled as possible from each other, their prim
This avoids the originating module to know about all potentially interested parties, which is a key aspect to enable application module integration testing (see xref:testing.adoc[Integration Testing Application Modules]).

Often we will find application components defined like this:

[tabs]
======
Java::
Expand Down Expand Up @@ -124,6 +125,7 @@ class InventoryManagement {
}
----
======

This now effectively decouples the original transaction from the execution of the listener.
While this avoids the expansion of the original business transaction, it also creates a risk: if the listener fails for whatever reason, the event publication is lost, unless each listener actually implements its own safety net.
Even worse, that doesn't even fully work, as the system might fail before the method is even invoked.
Expand Down Expand Up @@ -212,7 +214,9 @@ image::event-publication-registry-end.png[]
[[publication-registry.starters]]
=== Spring Boot Event Registry Starters

Using the transactional event publication log requires a combination of artifacts added to your application. To ease that task, Spring Modulith provides starter POMs that are centered around the <<publication-registry.publication-repositories, persistence technology>> to be used and default to the Jackson-based <<publication-registry.serialization, EventSerializer>> implementation. The following starters are available:
Using the transactional event publication log requires a combination of artifacts added to your application.
To ease that task, Spring Modulith provides starter POMs that are centered around the <<publication-registry.publication-repositories, persistence technology>> to be used and default to the Jackson-based <<publication-registry.serialization, EventSerializer>> implementation.
The following starters are available:

[%header,cols="1,3,6"]
|===
Expand Down Expand Up @@ -306,6 +310,7 @@ Spring Modulith provides a Jackson-based JSON implementation through the `spring

[[publication-registry.customize-publication-date]]
=== Customizing the Event Publication Date

By default, the Event Publication Registry will use the date returned by the `Clock.systemUTC()` as event publication date.
If you want to customize this, register a bean of type clock with the application context:

Expand Down Expand Up @@ -372,7 +377,8 @@ When routing key is set, requires SNS to be configured as a FIFO topic with cont
|Spring Messaging
|`spring-modulith-events-messaging`
|Uses Spring's core `Message` and `MessageChannel` support.
Resolves the target `MessageChannel` by its bean name given the `target` in the `Externalized` annotation. Forwards routing information as a header - called `modulithRouting` - to be processed in whatever way by downstream components, typically in a Spring Integration `IntegrationFlow`.
Resolves the target `MessageChannel` by its bean name given the `target` in the `Externalized` annotation.
Forwards routing information as a header - called `springModulith_routingTarget` - to be processed in whatever way by downstream components, typically in a Spring Integration `IntegrationFlow`.

|===

Expand Down Expand Up @@ -453,7 +459,6 @@ Kotlin::
[[externalization.api]]
=== Programmatic Event Externalization Configuration

The `spring-modulith-events-api` artifact contains `EventExternalizationConfiguration` that allows developers to customize all of the above mentioned steps.

.Programmatically configuring event externalization
Expand Down

0 comments on commit a8835b2

Please sign in to comment.