From 358d4a5abd012f37b70caa064e60f08cd406d5a7 Mon Sep 17 00:00:00 2001 From: Jeppe Cramon <96371136+cloudcreate-dk@users.noreply.github.com> Date: Wed, 4 Dec 2024 09:28:28 +0100 Subject: [PATCH 1/5] - Added new configuration properties to application.properties --- .../src/main/resources/application.properties | 3 ++- postgresql-cqrs/src/main/resources/application.properties | 6 +++++- .../src/main/resources/application.properties | 8 +++++++- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/mongodb-inbox-outbox/src/main/resources/application.properties b/mongodb-inbox-outbox/src/main/resources/application.properties index e6b66b1..0b28f86 100644 --- a/mongodb-inbox-outbox/src/main/resources/application.properties +++ b/mongodb-inbox-outbox/src/main/resources/application.properties @@ -19,7 +19,8 @@ essentials.immutable-jackson-module-enabled=true essentials.reactive.event-bus-backpressure-buffer-size=1024 essentials.reactive.overflow-max-retries=20 essentials.reactive.queued-task-cap-factor=1.5 -#essentials.reactive.parallel-threads=4 +#essentials.reactive.event-bus-parallel-threads=4 +#essentials.reactive.command-bus-parallel-send-and-dont-wait-consumers=4 essentials.durable-queues.shared-queue-collection-name=durable_queues essentials.durable-queues.message-handling-timeout=5s diff --git a/postgresql-cqrs/src/main/resources/application.properties b/postgresql-cqrs/src/main/resources/application.properties index 8a0f3fa..e58e947 100644 --- a/postgresql-cqrs/src/main/resources/application.properties +++ b/postgresql-cqrs/src/main/resources/application.properties @@ -19,7 +19,11 @@ essentials.immutable-jackson-module-enabled=true essentials.reactive.event-bus-backpressure-buffer-size=1024 essentials.reactive.overflow-max-retries=20 essentials.reactive.queued-task-cap-factor=1.5 -#essentials.reactive.parallel-threads=4 +#essentials.reactive.event-bus-parallel-threads=4 +#essentials.reactive.command-bus-parallel-send-and-dont-wait-consumers=4 + +essentials.multi-table-change-listener.filter-duplicate-notifications=true +essentials.multi-table-change-listener.polling-interval=100ms essentials.event-store.identifier-column-type=text essentials.event-store.json-column-type=jsonb diff --git a/postgresql-inbox-outbox/src/main/resources/application.properties b/postgresql-inbox-outbox/src/main/resources/application.properties index 7a1260b..352e55b 100644 --- a/postgresql-inbox-outbox/src/main/resources/application.properties +++ b/postgresql-inbox-outbox/src/main/resources/application.properties @@ -19,7 +19,11 @@ essentials.immutable-jackson-module-enabled=true essentials.reactive.event-bus-backpressure-buffer-size=1024 essentials.reactive.overflow-max-retries=20 essentials.reactive.queued-task-cap-factor=1.5 -#essentials.reactive.parallel-threads=4 +#essentials.reactive.event-bus-parallel-threads=4 +#essentials.reactive.command-bus-parallel-send-and-dont-wait-consumers=4 + +essentials.multi-table-change-listener.filter-duplicate-notifications=true +essentials.multi-table-change-listener.polling-interval=100ms essentials.durable-queues.shared-queue-table-name=durable_queues essentials.durable-queues.polling-delay-interval-increment-factor=0.5 @@ -36,6 +40,8 @@ spring.jpa.generate-ddl=true spring.jpa.hibernate.ddl-auto=create-drop spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.PostgreSQLDialect +spring.datasource.hikari.maximum-pool-size=25 + spring.application.name=postgresql-inbox-outbox spring.datasource.url=jdbc:postgresql://localhost:5432/essentials From 32fb50324179b2ac07acb9551a184c6aee70e705 Mon Sep 17 00:00:00 2001 From: Jeppe Cramon <96371136+cloudcreate-dk@users.noreply.github.com> Date: Wed, 4 Dec 2024 09:31:20 +0100 Subject: [PATCH 2/5] Added `Task` / `TaskProcessor` example that uses the new `InTransactionEventProcessor` and which tests modifying the same aggregate multiple times in the same UnitOfWork through a combination of Commands and Event handlers --- .../cqrs/task/TaskEventStoreRepository.java | 61 +++++++++ .../postgresql/cqrs/task/TaskProcessor.java | 80 ++++++++++++ .../cqrs/task/commands/AddComment.java | 24 ++++ .../cqrs/task/commands/CreateTask.java | 22 ++++ .../cqrs/task/commands/TaskCommand.java | 20 +++ .../postgresql/cqrs/task/domain/Comment.java | 22 ++++ .../postgresql/cqrs/task/domain/Task.java | 69 ++++++++++ .../postgresql/cqrs/task/domain/TaskId.java | 35 ++++++ .../cqrs/task/domain/events/CommentAdded.java | 32 +++++ .../cqrs/task/domain/events/TaskCreated.java | 32 +++++ .../cqrs/task/domain/events/TaskEvent.java | 24 ++++ .../postgresql/cqrs/task/TaskProcessorIT.java | 118 ++++++++++++++++++ .../src/test/resources/logback-test.xml | 5 + 13 files changed, 544 insertions(+) create mode 100644 postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/TaskEventStoreRepository.java create mode 100644 postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/TaskProcessor.java create mode 100644 postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/commands/AddComment.java create mode 100644 postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/commands/CreateTask.java create mode 100644 postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/commands/TaskCommand.java create mode 100644 postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/Comment.java create mode 100644 postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/Task.java create mode 100644 postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/TaskId.java create mode 100644 postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/events/CommentAdded.java create mode 100644 postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/events/TaskCreated.java create mode 100644 postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/events/TaskEvent.java create mode 100644 postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/TaskProcessorIT.java diff --git a/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/TaskEventStoreRepository.java b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/TaskEventStoreRepository.java new file mode 100644 index 0000000..90c1f6a --- /dev/null +++ b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/TaskEventStoreRepository.java @@ -0,0 +1,61 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task; + +import dk.cloudcreate.essentials.components.eventsourced.aggregates.stateful.StatefulAggregateRepository; +import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore; +import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateType; +import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.persistence.table_per_aggregate_type.SeparateTablePerAggregateEventStreamConfiguration; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands.CreateTask; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.Task; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.TaskId; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.events.TaskEvent; +import lombok.NonNull; +import org.springframework.stereotype.Component; + +import java.util.Optional; + +import static dk.cloudcreate.essentials.components.eventsourced.aggregates.stateful.StatefulAggregateInstanceFactory.reflectionBasedAggregateRootFactory; + +@Component +public class TaskEventStoreRepository { + + public static final AggregateType AGGREGATE_TYPE = AggregateType.of("Task"); + private final ConfigurableEventStore eventStore; + private final StatefulAggregateRepository repository; + + public TaskEventStoreRepository(@NonNull ConfigurableEventStore eventStore) { + this.eventStore = eventStore; + repository = StatefulAggregateRepository.from(eventStore, + AGGREGATE_TYPE, + reflectionBasedAggregateRootFactory(), + Task.class); + } + + public Optional findTask(@NonNull TaskId taskId) { + return repository.tryLoad(taskId); + } + + public Task getTask(@NonNull TaskId taskId) { + return repository.load(taskId); + } + + public Task createTask(@NonNull TaskId taskId, CreateTask cmd) { + var task = new Task(taskId, cmd); + return repository.save(task); + } +} diff --git a/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/TaskProcessor.java b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/TaskProcessor.java new file mode 100644 index 0000000..84f0c79 --- /dev/null +++ b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/TaskProcessor.java @@ -0,0 +1,80 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task; + +import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateType; +import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.processor.*; +import dk.cloudcreate.essentials.components.foundation.messaging.MessageHandler; +import dk.cloudcreate.essentials.reactive.command.CmdHandler; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands.*; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.Task; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.events.TaskCreated; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.List; + +import static java.util.Objects.nonNull; + +@Slf4j +@Service +public class TaskProcessor extends InTransactionEventProcessor { + + private final TaskEventStoreRepository taskEventStoreRepository; + + protected TaskProcessor(TaskEventStoreRepository taskEventStoreRepository, + EventProcessorDependencies eventProcessorDependencies) { + super(eventProcessorDependencies, true); + this.taskEventStoreRepository = taskEventStoreRepository; + } + + public TaskEventStoreRepository getTaskEventStoreRepository() { + return taskEventStoreRepository; + } + + @Override + public String getProcessorName() { + return "TaskProcessor"; + } + + @Override + protected List reactsToEventsRelatedToAggregateTypes() { + return List.of(TaskEventStoreRepository.AGGREGATE_TYPE); + } + + @CmdHandler + public void handle(CreateTask cmd) { + log.info("Creating task with command '{}'", cmd); + taskEventStoreRepository.createTask(cmd.taskId(), cmd); + + } + + @CmdHandler + public void handle(AddComment cmd) { + log.info("Adding comment '{}'", cmd); + Task task = taskEventStoreRepository.findTask(cmd.taskId()).orElseThrow(); + task.addComment(cmd); + } + + @MessageHandler + void handle(TaskCreated event) { + if (nonNull(event.getComment())) { + log.info("Task '{}' contains comment adding comment command", event); + commandBus.send(new AddComment(event.getTaskId(), event.getComment(), event.getCreatedAt())); + } + } +} diff --git a/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/commands/AddComment.java b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/commands/AddComment.java new file mode 100644 index 0000000..eeb7f02 --- /dev/null +++ b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/commands/AddComment.java @@ -0,0 +1,24 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands; + +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.TaskId; + +import java.time.LocalDateTime; + +public record AddComment(TaskId taskId, String content, LocalDateTime createdAt) implements TaskCommand { +} diff --git a/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/commands/CreateTask.java b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/commands/CreateTask.java new file mode 100644 index 0000000..fcbc5b0 --- /dev/null +++ b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/commands/CreateTask.java @@ -0,0 +1,22 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands; + +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.TaskId; + +public record CreateTask(TaskId taskId, String comment) implements TaskCommand { +} diff --git a/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/commands/TaskCommand.java b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/commands/TaskCommand.java new file mode 100644 index 0000000..f9f2d68 --- /dev/null +++ b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/commands/TaskCommand.java @@ -0,0 +1,20 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands; + +public interface TaskCommand { +} diff --git a/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/Comment.java b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/Comment.java new file mode 100644 index 0000000..407e5cb --- /dev/null +++ b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/Comment.java @@ -0,0 +1,22 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain; + +import java.time.LocalDateTime; + +public record Comment(TaskId taskId, String content, LocalDateTime createdAt) { +} diff --git a/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/Task.java b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/Task.java new file mode 100644 index 0000000..09c58dd --- /dev/null +++ b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/Task.java @@ -0,0 +1,69 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain; + +import dk.cloudcreate.essentials.components.eventsourced.aggregates.EventHandler; +import dk.cloudcreate.essentials.components.eventsourced.aggregates.stateful.modern.AggregateRoot; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands.AddComment; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands.CreateTask; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.events.CommentAdded; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.events.TaskCreated; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.events.TaskEvent; +import lombok.ToString; + +import java.time.LocalDateTime; +import java.util.HashSet; +import java.util.Set; + +@ToString +public class Task extends AggregateRoot { + + private Set comments; + + public Task(TaskId aggregateId) { + super(aggregateId); + } + + public Task(TaskId aggregateId, CreateTask cmd) { + super(aggregateId); + apply(new TaskCreated(aggregateId, + cmd.comment(), + LocalDateTime.now() + )); + } + + public void addComment(AddComment cmd) { + Comment comment = new Comment(cmd.taskId(), cmd.content(), cmd.createdAt()); + if (!comments.contains(comment)) { + apply(new CommentAdded(cmd.taskId(), cmd.content(), cmd.createdAt())); + } + } + + public Set getComments() { + return comments; + } + + @EventHandler + private void on(TaskCreated event) { + comments = new HashSet<>(); + } + + @EventHandler + private void on(CommentAdded event) { + comments.add(new Comment(event.getTaskId(), event.getContent(), event.getCreatedAt())); + } +} diff --git a/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/TaskId.java b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/TaskId.java new file mode 100644 index 0000000..1892570 --- /dev/null +++ b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/TaskId.java @@ -0,0 +1,35 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain; + +import dk.cloudcreate.essentials.components.foundation.types.RandomIdGenerator; +import dk.cloudcreate.essentials.types.CharSequenceType; + +public class TaskId extends CharSequenceType { + + public TaskId(CharSequence value) { + super(value); + } + + public static TaskId random() { + return new TaskId(RandomIdGenerator.generate()); + } + + public static TaskId of(CharSequence id) { + return new TaskId(id); + } +} diff --git a/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/events/CommentAdded.java b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/events/CommentAdded.java new file mode 100644 index 0000000..f92a621 --- /dev/null +++ b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/events/CommentAdded.java @@ -0,0 +1,32 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.events; + +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.TaskId; +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.time.LocalDateTime; + +@Getter +@AllArgsConstructor +public class CommentAdded implements TaskEvent { + + private TaskId taskId; + private String content; + private LocalDateTime createdAt; +} diff --git a/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/events/TaskCreated.java b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/events/TaskCreated.java new file mode 100644 index 0000000..52bf974 --- /dev/null +++ b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/events/TaskCreated.java @@ -0,0 +1,32 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.events; + +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.TaskId; +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.time.LocalDateTime; + +@Getter +@AllArgsConstructor +public class TaskCreated implements TaskEvent { + + private TaskId taskId; + private String comment; + private LocalDateTime createdAt; +} diff --git a/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/events/TaskEvent.java b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/events/TaskEvent.java new file mode 100644 index 0000000..89d5431 --- /dev/null +++ b/postgresql-cqrs/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/domain/events/TaskEvent.java @@ -0,0 +1,24 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.events; + +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.TaskId; + +public interface TaskEvent { + + TaskId getTaskId(); +} diff --git a/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/TaskProcessorIT.java b/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/TaskProcessorIT.java new file mode 100644 index 0000000..62d79df --- /dev/null +++ b/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/task/TaskProcessorIT.java @@ -0,0 +1,118 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task; + +import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore; +import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.bus.*; +import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.persistence.table_per_aggregate_type.SeparateTablePerAggregateEventStreamConfiguration; +import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.transaction.*; +import dk.cloudcreate.essentials.components.foundation.reactive.command.DurableLocalCommandBus; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.TestApplication; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands.CreateTask; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.TaskId; +import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.events.*; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.*; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.*; + +import java.time.Duration; +import java.util.*; + +import static org.assertj.core.api.Assertions.assertThat; + + +@Slf4j +@Testcontainers +@DirtiesContext +@SpringBootTest(classes = TestApplication.class) +public class TaskProcessorIT { + + @Container + static PostgreSQLContainer postgreSQLContainer = new PostgreSQLContainer<>("postgres:latest") + .withDatabaseName("test") + .withPassword("test") + .withUsername("test"); + + @Autowired + private TaskProcessor eventProcessor; + @Autowired + private ConfigurableEventStore eventStore; + + @DynamicPropertySource + static void setProperties(DynamicPropertyRegistry registry) { + registry.add("spring.datasource.url", postgreSQLContainer::getJdbcUrl); + registry.add("spring.datasource.password", postgreSQLContainer::getPassword); + registry.add("spring.datasource.username", postgreSQLContainer::getUsername); + } + + @Autowired + private DurableLocalCommandBus commandBus; + @Autowired + protected EventStoreEventBus eventStoreEventBus; + @Autowired + protected EventStoreUnitOfWorkFactory unitOfWorkFactory; + + protected List collectedEvents = new ArrayList<>(); + + protected void collectPersistedEvents() { + eventStoreEventBus.addSyncSubscriber(event -> { + if (event instanceof PersistedEvents persistedEvents) { + if (persistedEvents.commitStage == CommitStage.BeforeCommit && !persistedEvents.events.isEmpty()) { + log.info("Collected persisted events : {}", persistedEvents.events); + collectedEvents.add(persistedEvents); + } + } + }); + } + + /** + * This test involves creating Task aggregate with CreateTask command and with a second command AddComment + * add comment to the just created Task aggregate using the same unit of work. This flow is impl. in TaskProcessor + */ + @Test + public void create_task_and_comment_in_same_unit_of_work() { + collectPersistedEvents(); + Awaitility.waitAtMost(Duration.ofMillis(300)).until(() -> eventProcessor.isActive()); + + TaskId taskId = TaskId.random(); + String comment = "This is a good comment!"; + + CreateTask cmd = new CreateTask(taskId, comment); + System.out.println("Sending Command ---->"); + commandBus.send(cmd); + System.out.println("Command Sent <----"); + + assertThat(collectedEvents).hasSize(2); + assertThat(collectedEvents.get(0).events).hasSize(1); + assertThat(collectedEvents.get(1).events).hasSize(1); + var taskCreated = collectedEvents.get(0).events.get(0).event().deserialize(); + assertThat(taskCreated).isExactlyInstanceOf(TaskCreated.class); + assertThat(collectedEvents.get(0).events.get(0).aggregateId()).isEqualTo(taskId); + var commentAdded = collectedEvents.get(1).events.get(0).event().deserialize(); + assertThat(commentAdded).isExactlyInstanceOf(CommentAdded.class); + assertThat(collectedEvents.get(1).events.get(0).aggregateId()).isEqualTo(taskId); + + var task = unitOfWorkFactory.withUnitOfWork(() -> eventProcessor.getTaskEventStoreRepository().getTask(taskId)); + assertThat(task.getComments()).hasSize(1); + } +} diff --git a/postgresql-cqrs/src/test/resources/logback-test.xml b/postgresql-cqrs/src/test/resources/logback-test.xml index a7bd723..76293dd 100644 --- a/postgresql-cqrs/src/test/resources/logback-test.xml +++ b/postgresql-cqrs/src/test/resources/logback-test.xml @@ -34,6 +34,11 @@ + + + + + From c4bc6a3fb78c39c69e2ad59dbc0299726ceb0c86 Mon Sep 17 00:00:00 2001 From: Jeppe Cramon <96371136+cloudcreate-dk@users.noreply.github.com> Date: Wed, 4 Dec 2024 09:34:50 +0100 Subject: [PATCH 3/5] - Adjusted ShippingOrder/ShippingOrders design to use string instead of OrderId as primary key - Added LoadOrderShippingProcessor which in combination with LoadOrderShippingProcessorIT stress tests the DurableQueues and LocalEventBus - Introduced AbstractIntegrationTest - Added DurableQueuesLoadIT for stress testing DurableQueues indexes --- .../shipping/LoadOrderShippingProcessor.java | 114 +++++++++++++++ .../commands/RecreateShippingOrderView.java | 22 +++ .../commands/RecreateShippingOrderViews.java | 23 +++ .../shipping/domain/ShippingOrder.java | 9 +- .../shipping/domain/ShippingOrders.java | 10 +- .../src/main/resources/logback-spring.xml | 7 +- .../messaging/AbstractIntegrationTest.java | 83 +++++++++++ .../messaging/DurableQueuesLoadIT.java | 137 ++++++++++++++++++ .../postgresql/messaging/TestApplication.java | 2 + .../LoadOrderShippingProcessorIT.java | 109 ++++++++++++++ .../shipping/OrderShippingProcessorIT.java | 62 +------- .../src/test/resources/logback-test.xml | 17 ++- 12 files changed, 525 insertions(+), 70 deletions(-) create mode 100644 postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/LoadOrderShippingProcessor.java create mode 100644 postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/commands/RecreateShippingOrderView.java create mode 100644 postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/commands/RecreateShippingOrderViews.java create mode 100644 postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/AbstractIntegrationTest.java create mode 100644 postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/DurableQueuesLoadIT.java create mode 100644 postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/LoadOrderShippingProcessorIT.java diff --git a/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/LoadOrderShippingProcessor.java b/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/LoadOrderShippingProcessor.java new file mode 100644 index 0000000..5de773a --- /dev/null +++ b/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/LoadOrderShippingProcessor.java @@ -0,0 +1,114 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping; + +import dk.cloudcreate.essentials.components.foundation.Lifecycle; +import dk.cloudcreate.essentials.components.foundation.messaging.*; +import dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.*; +import dk.cloudcreate.essentials.reactive.Handler; +import dk.cloudcreate.essentials.reactive.command.*; +import dk.cloudcreate.essentials.shared.time.StopWatch; +import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.commands.*; +import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.domain.ShippingOrders; +import lombok.Getter; +import org.slf4j.*; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; + +@Service +public class LoadOrderShippingProcessor extends AnnotatedCommandHandler implements Lifecycle { + + private static final Logger log = LoggerFactory.getLogger(LoadOrderShippingProcessor.class); + private final Inboxes inboxes; + private final CommandBus commandBus; + private final ShippingOrders shippingOrders; + @Getter + private Inbox inbox; + + @Getter + private AtomicInteger receivedRecreateShippingOrderView = new AtomicInteger(0); + + private volatile boolean started = false; + + public LoadOrderShippingProcessor(Inboxes inboxes, + CommandBus commandBus, + ShippingOrders shippingOrders) { + this.inboxes = inboxes; + this.commandBus = commandBus; + this.shippingOrders = shippingOrders; + } + + @Handler + public void handle(RecreateShippingOrderViews cmd) { + StopWatch stopWatch = StopWatch.start(); + AtomicInteger count = new AtomicInteger(0); + log.info("Received RecreateShippingOrderViews '{}'", cmd); + var ids = shippingOrders.findAllOrderIds(); + ids.forEach(orderId -> { + //log.debug("Publishing order id '{}'", orderId); + count.getAndIncrement(); + publishToInbox(new RecreateShippingOrderView(OrderId.of(orderId))); + }); + + log.info("Took '{}' to publish '{}' shipping orders count '{}'", stopWatch.stop(), ids.size(), count.get()); + } + + public void publishToInbox(RecreateShippingOrderView cmd) { + inbox.addMessageReceived(cmd); + } + + @Handler + public void handle(RecreateShippingOrderView cmd) { + log.debug("---------------------> Received RecreateShippingOrderView '{}' with id '{}'", cmd, cmd.id()); + receivedRecreateShippingOrderView.getAndIncrement(); + } + + @Override + public void start() { + if (!started) { + log.info("Starting LoadOrderShippingProcessor with {} threads", 20); + started = true; + inbox = inboxes.getOrCreateInbox(InboxConfig.builder() + .inboxName(InboxName.of("load-test")) + .redeliveryPolicy(RedeliveryPolicy.fixedBackoff() + .setRedeliveryDelay(Duration.ZERO) + .setDeliveryErrorHandler(new MessageDeliveryErrorHandler.NeverRetry()) + .setMaximumNumberOfRedeliveries(0) + .build()) + .messageConsumptionMode(MessageConsumptionMode.SingleGlobalConsumer) + .numberOfParallelMessageConsumers(20) + .build(), + commandBus + ); + } + } + + @Override + public void stop() { + if (started) { + started = false; + inbox.stopConsuming(); + } + } + + @Override + public boolean isStarted() { + return false; + } +} diff --git a/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/commands/RecreateShippingOrderView.java b/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/commands/RecreateShippingOrderView.java new file mode 100644 index 0000000..fae3193 --- /dev/null +++ b/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/commands/RecreateShippingOrderView.java @@ -0,0 +1,22 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.commands; + +import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.OrderId; + +public record RecreateShippingOrderView(OrderId id) { +} diff --git a/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/commands/RecreateShippingOrderViews.java b/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/commands/RecreateShippingOrderViews.java new file mode 100644 index 0000000..00cb0c2 --- /dev/null +++ b/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/commands/RecreateShippingOrderViews.java @@ -0,0 +1,23 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.commands; + + +import java.time.OffsetDateTime; + +public record RecreateShippingOrderViews(OffsetDateTime timestamp) { +} diff --git a/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/domain/ShippingOrder.java b/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/domain/ShippingOrder.java index 9c3aa2a..fee0c55 100644 --- a/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/domain/ShippingOrder.java +++ b/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/domain/ShippingOrder.java @@ -16,7 +16,6 @@ package dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.domain; -import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.OrderId; import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.commands.RegisterShippingOrder; import jakarta.persistence.*; import lombok.*; @@ -28,16 +27,16 @@ @AllArgsConstructor @Builder public class ShippingOrder { - @EmbeddedId + @Id @EqualsAndHashCode.Include - @Column(name="order_id") - private OrderId id; + @Column(name = "order_id") + private String id; private boolean shipped; @Embedded private ShippingDestinationAddress destinationAddress; public ShippingOrder(RegisterShippingOrder cmd) { - this.id = cmd.orderId; + this.id = cmd.orderId.toString(); destinationAddress = cmd.destinationAddress; } diff --git a/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/domain/ShippingOrders.java b/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/domain/ShippingOrders.java index 035c86d..13d2a67 100644 --- a/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/domain/ShippingOrders.java +++ b/postgresql-inbox-outbox/src/main/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/domain/ShippingOrders.java @@ -19,12 +19,13 @@ import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.OrderId; import lombok.NonNull; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; import java.util.*; -public interface ShippingOrders extends JpaRepository { +public interface ShippingOrders extends JpaRepository { default Optional findOrder(@NonNull OrderId orderId) { - return findById(orderId); + return findById(orderId.toString()); } default ShippingOrder getOrder(@NonNull OrderId orderId) { @@ -36,5 +37,8 @@ default void registerNewOrder(@NonNull ShippingOrder order) { } List findByShipped(boolean shippedStatus); - List findByIdIn(List ids); + List findByIdIn(List ids); + + @Query("SELECT id FROM ShippingOrder") + List findAllOrderIds(); } diff --git a/postgresql-inbox-outbox/src/main/resources/logback-spring.xml b/postgresql-inbox-outbox/src/main/resources/logback-spring.xml index 76c68f9..c80d65e 100644 --- a/postgresql-inbox-outbox/src/main/resources/logback-spring.xml +++ b/postgresql-inbox-outbox/src/main/resources/logback-spring.xml @@ -37,10 +37,13 @@ + - - + + + + diff --git a/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/AbstractIntegrationTest.java b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/AbstractIntegrationTest.java new file mode 100644 index 0000000..5ba2e72 --- /dev/null +++ b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/AbstractIntegrationTest.java @@ -0,0 +1,83 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.messaging; + +import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues; +import dk.cloudcreate.essentials.components.foundation.reactive.command.DurableLocalCommandBus; +import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.OrderShippingProcessor; +import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.adapters.kafka.outgoing.ShippingEventKafkaPublisher; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.*; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.*; +import org.testcontainers.containers.*; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.*; +import org.testcontainers.utility.DockerImageName; + +import java.util.List; + +@Testcontainers +@DirtiesContext +@SpringBootTest(classes = TestApplication.class) +public class AbstractIntegrationTest { + + @Container + protected static PostgreSQLContainer postgreSQLContainer = new PostgreSQLContainer<>("postgres:latest") + .withDatabaseName("test") + .withPassword("test") + .withUsername("test"); + + @Container + static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + protected KafkaMessageListenerContainer kafkaListenerContainer; + + @DynamicPropertySource + protected static void setProperties(DynamicPropertyRegistry registry) { + registry.add("spring.datasource.url", postgreSQLContainer::getJdbcUrl); + registry.add("spring.datasource.password", postgreSQLContainer::getPassword); + registry.add("spring.datasource.username", postgreSQLContainer::getUsername); + + registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers); + } + + @Autowired + protected KafkaTemplate kafkaTemplate; + + + @Autowired + protected OrderShippingProcessor orderShippingProcessor; + + @Autowired + protected ShippingEventKafkaPublisher shippingEventKafkaPublisher; + + @Autowired + protected DurableLocalCommandBus commandBus; + + @Autowired + protected DurableQueues durableQueues; + + @Autowired + protected ConsumerFactory kafkaConsumerFactory; + + protected List> shippingRecordsReceived; + + +} diff --git a/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/DurableQueuesLoadIT.java b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/DurableQueuesLoadIT.java new file mode 100644 index 0000000..bee3db5 --- /dev/null +++ b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/DurableQueuesLoadIT.java @@ -0,0 +1,137 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.messaging; + + +import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy; +import dk.cloudcreate.essentials.components.foundation.messaging.queue.*; +import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ConsumeFromQueue; +import dk.cloudcreate.essentials.components.foundation.reactive.command.DurableLocalCommandBus; +import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory; +import dk.cloudcreate.essentials.components.foundation.types.CorrelationId; +import dk.cloudcreate.essentials.shared.time.StopWatch; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.*; +import org.testcontainers.containers.*; +import org.testcontainers.junit.jupiter.*; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.utility.DockerImageName; + +import java.time.*; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; + +import static dk.cloudcreate.essentials.shared.MessageFormatter.msg; +import static org.assertj.core.api.Assertions.assertThat; + +@SpringBootTest(classes = TestApplication.class) +@Testcontainers +@DirtiesContext +public class DurableQueuesLoadIT { + @Container + static PostgreSQLContainer postgreSQLContainer = new PostgreSQLContainer<>("postgres:latest") + .withDatabaseName("test") + .withPassword("test") + .withUsername("test"); + @Container + static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + + + @DynamicPropertySource + static void setProperties(DynamicPropertyRegistry registry) { + registry.add("spring.datasource.url", postgreSQLContainer::getJdbcUrl); + registry.add("spring.datasource.password", postgreSQLContainer::getPassword); + registry.add("spring.datasource.username", postgreSQLContainer::getUsername); + + registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers); + } + + private DurableQueueConsumer consumer; + @Autowired + private DurableLocalCommandBus commandBus; + + @Autowired + private UnitOfWorkFactory unitOfWorkFactory; + + @Autowired + private DurableQueues durableQueues; + + @AfterEach + void cleanup() { + if (consumer != null) { + consumer.cancel(); + } + } + + @Test + void queue_a_large_number_of_messages() { + // Given + var queueName = QueueName.of("TestQueue"); + var now = Instant.now(); + + var msgHandler = new RecordingQueuedMessageHandler(); + consumer = durableQueues.consumeFromQueue(ConsumeFromQueue.builder() + .setQueueName(queueName) + .setRedeliveryPolicy(RedeliveryPolicy.fixedBackoff(Duration.ofMillis(100), 0)) + .setParallelConsumers(1) + .setConsumerName("TestConsumer") + .setQueueMessageHandler(msgHandler) + .build()); + + var count = 20000; + var stopwatch = StopWatch.start(); + unitOfWorkFactory.usingUnitOfWork(uow -> { + IntStream.range(0, count).forEach(i -> { + durableQueues.queueMessage(queueName, + Message.of(("Message" + i), + MessageMetaData.of("correlation_id", CorrelationId.random(), + "trace_id", UUID.randomUUID().toString()))); + }); + }); + System.out.println(msg("-----> {} Queueing {} messages took {}", Instant.now(), count, stopwatch.stop())); + + assertThat(durableQueues.getTotalMessagesQueuedFor(queueName)).isEqualTo(count); + var nextMessages = durableQueues.queryForMessagesSoonReadyForDelivery(queueName, + now, + 10); + assertThat(nextMessages).hasSize(10); + + + Awaitility.waitAtMost(Duration.ofSeconds(10)) + .untilAsserted(() -> { + System.out.println("-----> " + Instant.now() + " messages received: " + msgHandler.messagesReceived.get()); + assertThat(msgHandler.messagesReceived.get()).isGreaterThan(10); + }); + consumer.cancel(); + consumer = null; + + } + + static class RecordingQueuedMessageHandler implements QueuedMessageHandler { + AtomicLong messagesReceived = new AtomicLong(); + + @Override + public void handle(QueuedMessage message) { + messagesReceived.getAndIncrement(); + } + } +} \ No newline at end of file diff --git a/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/TestApplication.java b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/TestApplication.java index e1c1d59..c80c95e 100644 --- a/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/TestApplication.java +++ b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/TestApplication.java @@ -20,4 +20,6 @@ @SpringBootApplication public class TestApplication { + + } diff --git a/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/LoadOrderShippingProcessorIT.java b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/LoadOrderShippingProcessorIT.java new file mode 100644 index 0000000..2adb1bb --- /dev/null +++ b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/LoadOrderShippingProcessorIT.java @@ -0,0 +1,109 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping; + +import dk.cloudcreate.essentials.shared.time.StopWatch; +import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.AbstractIntegrationTest; +import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.commands.RecreateShippingOrderViews; +import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.domain.*; +import lombok.SneakyThrows; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.slf4j.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Transactional; + +import java.time.*; +import java.util.*; + +import static org.assertj.core.api.Assertions.assertThat; + +public class LoadOrderShippingProcessorIT extends AbstractIntegrationTest { + + private static final Logger log = LoggerFactory.getLogger(LoadOrderShippingProcessorIT.class); + + private static final int NUMBER_OF_INSERTS = 15000; + + @Autowired + private ShippingOrders shippingOrders; + + @Autowired + private LoadOrderShippingProcessor loadOrderShippingProcessor; + + @SneakyThrows + @Test + public void stress_test_durable_queues_and_local_eventbus() { + + insertTestData(); + + log.debug("Sending RecreateShippingOrderViews"); + commandBus.send(new RecreateShippingOrderViews(OffsetDateTime.now())); + + var queueName = loadOrderShippingProcessor.getInbox().name().asQueueName(); + long totalMessagesQueuedFor = durableQueues.getTotalMessagesQueuedFor(queueName); + log.debug("########## TotalMessagesQueued for '{}': '{}'", queueName, totalMessagesQueuedFor); + + var stopWatch = StopWatch.start(); + Awaitility.waitAtMost(Duration.ofMinutes(60)) + .pollDelay(Duration.ofSeconds(5)) + .pollInterval(Duration.ofSeconds(10)) + .until(() -> { + long currentlyQueuedFor = durableQueues.getTotalMessagesQueuedFor(queueName); + log.debug("==========================================================================================="); + log.debug("########## TotalMessagesQueued for '{}': '{}' after {}", queueName, currentlyQueuedFor, stopWatch.elapsed()); + log.debug("########## ReceivedRecreateShippingOrderView Messages: '{}'", loadOrderShippingProcessor.getReceivedRecreateShippingOrderView()); + log.debug("########## DeadLetterMessages for '{}': '{}'", queueName, durableQueues.getTotalDeadLetterMessagesQueuedFor(queueName)); + return loadOrderShippingProcessor.getReceivedRecreateShippingOrderView().get() == totalMessagesQueuedFor; + }); + + assertThat(durableQueues.getTotalDeadLetterMessagesQueuedFor(queueName)).isEqualTo(0); + } + + + @Transactional + protected void insertTestData() { + var stopWatch = StopWatch.start(); + var batchSize = 100; + List batch = new ArrayList<>(batchSize); + for (int i = 0; i < NUMBER_OF_INSERTS; i++) { + var shippingOrder = new ShippingOrder(); + shippingOrder.setId(OrderId.random().toString()); + batch.add(shippingOrder); + + if (batch.size() == batchSize) { + try { + shippingOrders.saveAll(batch); + } catch (Exception e) { + log.error("Failed to insert batch: {}", e.getMessage(), e); + throw e; + } + batch = new ArrayList<>(batchSize); + } + + if (i % 1000 == 0) { + log.debug("Have inserted {} ShippingOrders so far", i); + } + } + if (!batch.isEmpty()) { + log.debug("Have inserted remaining {} ShippingOrders", batch.size()); + shippingOrders.saveAll(batch); + } + long count = shippingOrders.count(); + log.info("Took '{}' to insert '{}' shipping orders count '{}'", stopWatch.stop(), NUMBER_OF_INSERTS, count); + } + +} diff --git a/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/OrderShippingProcessorIT.java b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/OrderShippingProcessorIT.java index fb2a968..fea9406 100644 --- a/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/OrderShippingProcessorIT.java +++ b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/OrderShippingProcessorIT.java @@ -16,79 +16,25 @@ package dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping; -import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues; -import dk.cloudcreate.essentials.components.foundation.reactive.command.DurableLocalCommandBus; -import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.TestApplication; +import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.AbstractIntegrationTest; import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.adapters.kafka.incoming.*; import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.adapters.kafka.outgoing.*; import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.commands.RegisterShippingOrder; import dk.cloudcreate.essentials.spring.examples.postgresql.messaging.shipping.domain.ShippingDestinationAddress; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.*; import org.slf4j.*; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.kafka.core.*; import org.springframework.kafka.listener.*; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.*; -import org.testcontainers.containers.*; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.*; import org.testcontainers.shaded.org.awaitility.Awaitility; -import org.testcontainers.utility.DockerImageName; import java.time.Duration; -import java.util.*; +import java.util.ArrayList; import static org.assertj.core.api.Assertions.assertThat; -@SpringBootTest(classes = TestApplication.class) -@Testcontainers -@DirtiesContext -public class OrderShippingProcessorIT { - private static final Logger log = LoggerFactory.getLogger(OrderShippingProcessorIT.class); - - @Container - static PostgreSQLContainer postgreSQLContainer = new PostgreSQLContainer<>("postgres:latest") - .withDatabaseName("test") - .withPassword("test") - .withUsername("test"); - - @Container - static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); - private KafkaMessageListenerContainer kafkaListenerContainer; - - @DynamicPropertySource - static void setProperties(DynamicPropertyRegistry registry) { - registry.add("spring.datasource.url", postgreSQLContainer::getJdbcUrl); - registry.add("spring.datasource.password", postgreSQLContainer::getPassword); - registry.add("spring.datasource.username", postgreSQLContainer::getUsername); - - registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers); - } - - @Autowired - private KafkaTemplate kafkaTemplate; +public class OrderShippingProcessorIT extends AbstractIntegrationTest { - - @Autowired - private OrderShippingProcessor orderShippingProcessor; - - @Autowired - private ShippingEventKafkaPublisher shippingEventKafkaPublisher; - - @Autowired - private DurableLocalCommandBus commandBus; - - @Autowired - private DurableQueues durableQueues; - - @Autowired - ConsumerFactory kafkaConsumerFactory; - - private List> shippingRecordsReceived; + private static final Logger log = LoggerFactory.getLogger(OrderShippingProcessorIT.class); @BeforeEach void setup() { diff --git a/postgresql-inbox-outbox/src/test/resources/logback-test.xml b/postgresql-inbox-outbox/src/test/resources/logback-test.xml index a7bd723..5c842d0 100644 --- a/postgresql-inbox-outbox/src/test/resources/logback-test.xml +++ b/postgresql-inbox-outbox/src/test/resources/logback-test.xml @@ -24,15 +24,28 @@ - + + + + + + + - + + + + + + + + From 7c77c00d624658f53fbefcd9e9425c5fde3260c4 Mon Sep 17 00:00:00 2001 From: Jeppe Cramon <96371136+cloudcreate-dk@users.noreply.github.com> Date: Wed, 4 Dec 2024 09:35:10 +0100 Subject: [PATCH 4/5] Updated to essentials 0.40.19 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2ef1c91..17c2f36 100644 --- a/pom.xml +++ b/pom.xml @@ -61,7 +61,7 @@ false - 0.40.18 + 0.40.19 3.2.12 6.1.15 From 40cbe9cf449f519f5aeecf1770b7e58b6014a5e6 Mon Sep 17 00:00:00 2001 From: Jeppe Cramon <96371136+cloudcreate-dk@users.noreply.github.com> Date: Wed, 18 Dec 2024 08:13:59 +0100 Subject: [PATCH 5/5] - Updated from the deprecated `org.testcontainers.containers.KafkaContainer` to `org.testcontainers.kafka.KafkaContainer` - Switched to use the `apache/kafka-native` docker image as required by `org.testcontainers.kafka.KafkaContainer` - Added custom env as recommended in https://github.com/testcontainers/testcontainers-java/issues/9506 - Added `objenesis` dependency where needed to deserialize outbox messages - Adjusted test timeout parameters - Adjusted logging to be less verbose when running the application --- docker-compose.yml | 22 +------------------ mongodb-inbox-outbox/pom.xml | 10 ++++----- .../shipping/OrderShippingProcessorIT.java | 3 ++- .../postgresql/cqrs/banking/AccountsIT.java | 3 ++- .../banking/TransferMoneyProcessorIT.java | 3 ++- .../shipping/OrderShippingProcessorIT.java | 7 +++--- postgresql-inbox-outbox/pom.xml | 10 ++++----- .../src/main/resources/logback-spring.xml | 16 +++++++------- .../messaging/AbstractIntegrationTest.java | 3 ++- .../messaging/DurableQueuesLoadIT.java | 5 +++-- .../LoadOrderShippingProcessorIT.java | 2 +- 11 files changed, 34 insertions(+), 50 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 1645a06..3110230 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,31 +24,11 @@ services: - "27017:27017" command: [--replSet, "rs0"] - # From https://developer.confluent.io/quickstart/kafka-docker/ - zookeeper: - image: confluentinc/cp-zookeeper:latest - container_name: zookeeper - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - broker: - image: confluentinc/cp-kafka:latest + image: apache/kafka-native:latest container_name: kafka_broker ports: - # To learn about configuring Kafka for access across networks see - # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ - "9092:9092" - depends_on: - - zookeeper - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 # Tracing, Latency and Metric setup is inspired by https://spring.io/blog/2022/10/12/observability-with-spring-boot-3 / https://github.com/marcingrzejszczak/observability-boot-blog-post tempo: diff --git a/mongodb-inbox-outbox/pom.xml b/mongodb-inbox-outbox/pom.xml index 164eaf9..2172a43 100644 --- a/mongodb-inbox-outbox/pom.xml +++ b/mongodb-inbox-outbox/pom.xml @@ -38,11 +38,11 @@ spring-boot-starter-mongodb ${essentials.version} - - - - - + + org.objenesis + objenesis + ${objenesis.version} + org.springframework.boot spring-boot-starter-data-mongodb diff --git a/mongodb-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/mongodb/messaging/shipping/OrderShippingProcessorIT.java b/mongodb-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/mongodb/messaging/shipping/OrderShippingProcessorIT.java index b4ac1cf..abbd8d9 100644 --- a/mongodb-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/mongodb/messaging/shipping/OrderShippingProcessorIT.java +++ b/mongodb-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/mongodb/messaging/shipping/OrderShippingProcessorIT.java @@ -53,7 +53,8 @@ public class OrderShippingProcessorIT { static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:latest"); @Container - static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + static org.testcontainers.kafka.KafkaContainer kafkaContainer = new org.testcontainers.kafka.KafkaContainer("apache/kafka-native:latest") + .withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094"); private KafkaMessageListenerContainer kafkaListenerContainer; @DynamicPropertySource diff --git a/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/banking/AccountsIT.java b/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/banking/AccountsIT.java index 0d54aee..e66c0ef 100644 --- a/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/banking/AccountsIT.java +++ b/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/banking/AccountsIT.java @@ -50,7 +50,8 @@ public class AccountsIT { .withUsername("test"); @Container - static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + static org.testcontainers.kafka.KafkaContainer kafkaContainer = new org.testcontainers.kafka.KafkaContainer("apache/kafka-native:latest") + .withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094"); @DynamicPropertySource static void setProperties(DynamicPropertyRegistry registry) { diff --git a/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/banking/TransferMoneyProcessorIT.java b/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/banking/TransferMoneyProcessorIT.java index 34b3b4e..f13c1ef 100644 --- a/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/banking/TransferMoneyProcessorIT.java +++ b/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/banking/TransferMoneyProcessorIT.java @@ -53,7 +53,8 @@ public class TransferMoneyProcessorIT { .withUsername("test"); @Container - static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + static org.testcontainers.kafka.KafkaContainer kafkaContainer = new org.testcontainers.kafka.KafkaContainer("apache/kafka-native:latest") + .withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094"); @DynamicPropertySource static void setProperties(DynamicPropertyRegistry registry) { diff --git a/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/shipping/OrderShippingProcessorIT.java b/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/shipping/OrderShippingProcessorIT.java index f6e165a..0529266 100644 --- a/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/shipping/OrderShippingProcessorIT.java +++ b/postgresql-cqrs/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/cqrs/shipping/OrderShippingProcessorIT.java @@ -33,11 +33,9 @@ import org.springframework.kafka.listener.*; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.*; -import org.testcontainers.containers.*; -import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.junit.jupiter.*; import org.testcontainers.shaded.org.awaitility.Awaitility; -import org.testcontainers.utility.DockerImageName; import java.time.Duration; import java.util.*; @@ -57,7 +55,8 @@ public class OrderShippingProcessorIT { .withUsername("test"); @Container - static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + static org.testcontainers.kafka.KafkaContainer kafkaContainer = new org.testcontainers.kafka.KafkaContainer("apache/kafka-native:latest") + .withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094"); private KafkaMessageListenerContainer kafkaListenerContainer; @DynamicPropertySource diff --git a/postgresql-inbox-outbox/pom.xml b/postgresql-inbox-outbox/pom.xml index a657beb..6e6e77a 100644 --- a/postgresql-inbox-outbox/pom.xml +++ b/postgresql-inbox-outbox/pom.xml @@ -43,11 +43,11 @@ types-springdata-jpa ${essentials.version} - - - - - + + org.objenesis + objenesis + ${objenesis.version} + org.springframework.boot spring-boot-starter-data-jpa diff --git a/postgresql-inbox-outbox/src/main/resources/logback-spring.xml b/postgresql-inbox-outbox/src/main/resources/logback-spring.xml index c80d65e..941856a 100644 --- a/postgresql-inbox-outbox/src/main/resources/logback-spring.xml +++ b/postgresql-inbox-outbox/src/main/resources/logback-spring.xml @@ -37,15 +37,15 @@ - + - - - - - - - + + + + + + + diff --git a/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/AbstractIntegrationTest.java b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/AbstractIntegrationTest.java index 5ba2e72..ab4424b 100644 --- a/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/AbstractIntegrationTest.java +++ b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/AbstractIntegrationTest.java @@ -46,7 +46,8 @@ public class AbstractIntegrationTest { .withUsername("test"); @Container - static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + static org.testcontainers.kafka.KafkaContainer kafkaContainer = new org.testcontainers.kafka.KafkaContainer("apache/kafka-native:latest") + .withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094"); protected KafkaMessageListenerContainer kafkaListenerContainer; @DynamicPropertySource diff --git a/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/DurableQueuesLoadIT.java b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/DurableQueuesLoadIT.java index bee3db5..b8bcb64 100644 --- a/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/DurableQueuesLoadIT.java +++ b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/DurableQueuesLoadIT.java @@ -53,7 +53,8 @@ public class DurableQueuesLoadIT { .withPassword("test") .withUsername("test"); @Container - static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + static org.testcontainers.kafka.KafkaContainer kafkaContainer = new org.testcontainers.kafka.KafkaContainer("apache/kafka-native:latest") + .withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094"); @DynamicPropertySource @@ -116,7 +117,7 @@ void queue_a_large_number_of_messages() { assertThat(nextMessages).hasSize(10); - Awaitility.waitAtMost(Duration.ofSeconds(10)) + Awaitility.waitAtMost(Duration.ofSeconds(20)) .untilAsserted(() -> { System.out.println("-----> " + Instant.now() + " messages received: " + msgHandler.messagesReceived.get()); assertThat(msgHandler.messagesReceived.get()).isGreaterThan(10); diff --git a/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/LoadOrderShippingProcessorIT.java b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/LoadOrderShippingProcessorIT.java index 2adb1bb..4f48483 100644 --- a/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/LoadOrderShippingProcessorIT.java +++ b/postgresql-inbox-outbox/src/test/java/dk/cloudcreate/essentials/spring/examples/postgresql/messaging/shipping/LoadOrderShippingProcessorIT.java @@ -58,7 +58,7 @@ public void stress_test_durable_queues_and_local_eventbus() { log.debug("########## TotalMessagesQueued for '{}': '{}'", queueName, totalMessagesQueuedFor); var stopWatch = StopWatch.start(); - Awaitility.waitAtMost(Duration.ofMinutes(60)) + Awaitility.waitAtMost(Duration.ofMinutes(3)) .pollDelay(Duration.ofSeconds(5)) .pollInterval(Duration.ofSeconds(10)) .until(() -> {