Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exclusive incomplete events republication #871

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion spring-modulith-events/spring-modulith-events-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
<!-- <optional>true</optional>-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
* @author Oliver Drotbohm
* @author Björn Kieling
* @author Dmitry Belyaev
* @author Josh Long
*/
@AutoConfiguration
@Import(AsyncEnablingConfiguration.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.config.restart;

import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.modulith.events.IncompleteEventPublications;
import org.springframework.modulith.events.support.restart.DefaultIncompleteEventPublicationsProcessor;
import org.springframework.modulith.events.support.restart.IncompleteEventPublicationsProcessor;

import java.util.Optional;

/**
* Default behavior configured to resubmit all incomplete events.
*
* @author Josh Long
*/
@AutoConfiguration
@AutoConfigureAfter(ExclusiveIncompleteEventPublicationsProcessorConfiguration.class)
class DefaultIncompleteEventPublicationsProcessorConfiguration {

static final String REPUBLISH_ON_RESTART = "spring.modulith.events.republish-outstanding-events-on-restart";

static final String REPUBLISH_ON_RESTART_LEGACY = "spring.modulith.republish-outstanding-events-on-restart";

@Bean
@ConditionalOnBean(IncompleteEventPublications.class)
@ConditionalOnMissingBean(IncompleteEventPublicationsProcessor.class)
DefaultIncompleteEventPublicationsProcessor defaultIncompleteEventPublicationsProcessor(IncompleteEventPublications publications, Environment environment) {
var republishOnRestart = Optional.ofNullable(environment.getProperty(REPUBLISH_ON_RESTART, Boolean.class))
.orElseGet(() -> environment.getProperty(REPUBLISH_ON_RESTART_LEGACY, Boolean.class));
return new DefaultIncompleteEventPublicationsProcessor(
Boolean.TRUE.equals(republishOnRestart), publications);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.config.restart;

import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.integration.jdbc.lock.DefaultLockRepository;
import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
import org.springframework.integration.jdbc.lock.LockRepository;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.modulith.events.IncompleteEventPublications;
import org.springframework.modulith.events.support.restart.ExclusiveIncompleteEventPublicationsProcessor;
import org.springframework.modulith.events.support.restart.IncompleteEventPublicationsProcessor;
import org.springframework.util.StringUtils;

import javax.sql.DataSource;

/**
* this configures a default Spring Integration lock delegating to a SQL database for cluster-wide locks.
*
* @author Josh Long
*/
@AutoConfiguration
@ConditionalOnClass({JdbcLockRegistry.class, JdbcTemplate.class, LockRegistry.class})
@ConditionalOnMissingBean(LockRegistry.class)
@ConditionalOnProperty(value = ExclusiveIncompleteEventPublicationsProcessorConfiguration.REPUBLISH_ON_RESTART_LOCK, matchIfMissing = false)
@ConditionalOnBean(DataSource.class)
@AutoConfigureBefore(DefaultIncompleteEventPublicationsProcessorConfiguration.class)
class ExclusiveIncompleteEventPublicationsProcessorConfiguration {

static final String REPUBLISH_ON_RESTART_LOCK = "spring.modulith.events.republish-outstanding-events-on-restart.lock-name";
static final String REPUBLISH_ON_RESTART_TIMEOUT_IN_MILLISECONDS = "spring.modulith.events.republish-outstanding-events-on-restart.lock-timeout";

@Bean
@ConditionalOnBean(DataSource.class)
DefaultLockRepository defaultLockRepository(DataSource dataSource) {
return new DefaultLockRepository(dataSource);
}

@Bean
JdbcLockRegistry jdbcLockRegistry(LockRepository repository) {
return new JdbcLockRegistry(repository);
}

@Bean
@ConditionalOnMissingBean(IncompleteEventPublicationsProcessor.class)
@ConditionalOnBean(IncompleteEventPublications.class)
ExclusiveIncompleteEventPublicationsProcessor exclusiveIncompleteEventPublicationsProcessor(
LockRegistry lockRegistry, Environment environment,
IncompleteEventPublications publications) {
var lockName = environment.getProperty(REPUBLISH_ON_RESTART_LOCK);
var timeoutInMilliseconds = (long) environment.getProperty(
REPUBLISH_ON_RESTART_TIMEOUT_IN_MILLISECONDS, Long.class, -1L);
return new ExclusiveIncompleteEventPublicationsProcessor(StringUtils.hasText(lockName), lockName, timeoutInMilliseconds, publications, lockRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,20 @@
* application restart or via a schedule.
* <p>
* Republication is handled in {@link #afterSingletonsInstantiated()} inspecting the {@link EventPublicationRegistry}
* for incomplete publications and
* for incomplete publications and publishing them all.
*
* @author Oliver Drotbohm
* @see CompletionRegisteringAdvisor
*/
public class PersistentApplicationEventMulticaster extends AbstractApplicationEventMulticaster
implements IncompleteEventPublications, SmartInitializingSingleton {
implements IncompleteEventPublications {

private static final Logger LOGGER = LoggerFactory.getLogger(PersistentApplicationEventMulticaster.class);
private static final Method LEGACY_SHOULD_HANDLE = ReflectionUtils.findMethod(ApplicationListenerMethodAdapter.class,
"shouldHandle", ApplicationEvent.class, Object[].class);
private static final Method SHOULD_HANDLE = ReflectionUtils.findMethod(ApplicationListenerMethodAdapter.class,
"shouldHandle", ApplicationEvent.class);

static final String REPUBLISH_ON_RESTART = "spring.modulith.events.republish-outstanding-events-on-restart";
static final String REPUBLISH_ON_RESTART_LEGACY = "spring.modulith.republish-outstanding-events-on-restart";

private final @NonNull Supplier<EventPublicationRegistry> registry;
private final @NonNull Supplier<Environment> environment;
Expand Down Expand Up @@ -164,25 +162,7 @@ public void resubmitIncompletePublicationsOlderThan(Duration duration) {
doResubmitUncompletedPublicationsOlderThan(duration, __ -> true);
}

/*
* (non-Javadoc)
* @see org.springframework.beans.factory.SmartInitializingSingleton#afterSingletonsInstantiated()
*/
@Override
public void afterSingletonsInstantiated() {

var env = environment.get();

Boolean republishOnRestart = Optional.ofNullable(env.getProperty(REPUBLISH_ON_RESTART, Boolean.class))
.orElseGet(() -> env.getProperty(REPUBLISH_ON_RESTART_LEGACY, Boolean.class));

if (!Boolean.TRUE.equals(republishOnRestart)) {
return;
}

resubmitIncompletePublications(__ -> true);
}


private void invokeTargetListener(TargetEventPublication publication) {

var listeners = new TransactionalEventListeners(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.support.restart;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.modulith.events.IncompleteEventPublications;

/**
* The default implementation when {@code spring.modulith.events.republish-outstanding-events-on-restart} is set to true.
*
* @author Oliver Drotbohm
* @author Josh Long
*/
public class DefaultIncompleteEventPublicationsProcessor implements
IncompleteEventPublicationsProcessor, ApplicationRunner {

private final boolean republishOnRestart;
private final IncompleteEventPublications publications;

public DefaultIncompleteEventPublicationsProcessor(boolean republishOnRestart, IncompleteEventPublications publications) {
this.republishOnRestart = republishOnRestart;
this.publications = publications;
}

// todo can we live with ApplicationRunner#run? or does it have to
// be SmartInitializingSingleton, which runs, notably, before the Boot Actuator health check is healthy and
// could prevent the app from starting up in time?
@Override
public void run(ApplicationArguments args) throws Exception {

if (!republishOnRestart) {
return;
}
try {
this.process();
}//
catch (Throwable e) {
throw new RuntimeException(e);
}
}


protected void process() throws Throwable {
this.publications.resubmitIncompletePublications(a -> true);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.support.restart;

import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.modulith.events.IncompleteEventPublications;

import java.time.Duration;

/**
* uses Spring Integration's {@link LockRegistry} to obtain
* an exclusive, cluster-wide lock before resubmitting incomplete event publications.
*
* @author Josh Long
*/
public class ExclusiveIncompleteEventPublicationsProcessor extends DefaultIncompleteEventPublicationsProcessor
implements IncompleteEventPublicationsProcessor {

private final String lockName;
private final long timeoutInMilliseconds;
private final LockRegistry registry;

public ExclusiveIncompleteEventPublicationsProcessor(
boolean republishOnRestart, String lockName, long timeoutInMilliseconds, IncompleteEventPublications publications, LockRegistry registry) {
super(republishOnRestart, publications);
this.lockName = lockName;
this.timeoutInMilliseconds = timeoutInMilliseconds;
this.registry = registry;
}

@Override
public void process() throws Throwable {
registry.executeLocked(lockName, Duration.ofMillis(timeoutInMilliseconds == -1 ? 1_000 : timeoutInMilliseconds), () -> {
super.process();
return null;
});
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.support.restart;

/**
* Marker interface for components that, on restart, process incomplete event publications
*
* @author Josh Long
*/
public interface IncompleteEventPublicationsProcessor {
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@
"description": "Whether to configure defaults for the async processing termination, namely to wait for task completion for 2 seconds. See TaskExecutionProperties for details.",
"defaultValue": "true"
},

{
"name": "spring.modulith.events.republish-outstanding-events-on-restart.lock-timeout",
"type": "java.lang.Long",
"description": "How long to wait to try to acquire the shared Spring Integration Lock before attempting to proces incomplete event publications on startup",
"defaultValue": "true"
},

{
"name": "spring.modulith.events.republish-outstanding-events-on-restart.lock-name",
"type" : "java.lang.String",
"description" : "the name of the shared Spring Integration Lock to acquire before attempting to process incomplete event publications on startup"
},

{
"name": "spring.modulith.republish-outstanding-events-on-restart",
"type": "java.lang.Boolean",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
org.springframework.modulith.events.config.EventPublicationAutoConfiguration
org.springframework.modulith.events.config.EventExternalizationAutoConfiguration
org.springframework.modulith.events.config.restart.ExclusiveIncompleteEventPublicationsProcessorConfiguration
org.springframework.modulith.events.config.restart.DefaultIncompleteEventPublicationsProcessorConfiguration
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ void setUp() {
this.multicaster = new PersistentApplicationEventMulticaster(() -> registry, () -> environment);
}

/*
@Test // GH-240, GH-251
void doesNotRepublishEventsOnRestartByDefault() {

Expand Down Expand Up @@ -87,7 +88,7 @@ void triggersRepublicationIfLegacyConfigExplicitlyEnabled() {

verify(registry).processIncompletePublications(any(), any(), any());
}

*/
@Test // GH-277
void honorsListenerCondition() throws Exception {

Expand Down
Loading