Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPB 3 #50

Draft
wants to merge 3 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions bom/parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
<name>POM / Parent</name>

<properties>
<spring-boot.version>2.7.17</spring-boot.version>
<camunda-7.version>7.19.0</camunda-7.version>
<camunda-bpm-data.version>1.2.8</camunda-bpm-data.version>
<shedlock.version>4.46.0</shedlock.version>
<spring-boot.version>3.1.5</spring-boot.version>
<camunda-7.version>7.20.0</camunda-7.version>
<camunda-bpm-data.version>1.4.0</camunda-bpm-data.version>
<shedlock.version>5.10.0</shedlock.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -73,14 +73,14 @@
<dependency>
<groupId>io.github.microutils</groupId>
<artifactId>kotlin-logging-jvm</artifactId>
<version>3.0.4</version>
<version>3.0.5</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.mockito.kotlin</groupId>
<artifactId>mockito-kotlin</artifactId>
<version>5.0.0</version>
<version>5.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion example/axon/reservation-axon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
<dependency>
<groupId>io.holunda</groupId>
<artifactId>camunda-platform-7-autologin</artifactId>
<version>0.0.1</version>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>org.camunda.bpm.springboot</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.holunda.camunda.bpm.example.axon

import io.holunda.camunda.bpm.data.CamundaBpmData.builder
import io.holunda.camunda.bpm.data.CamundaBpmData.longVariable
import io.holunda.camunda.bpm.data.CamundaBpmDataKotlin.longVariable
import io.holunda.camunda.bpm.data.CamundaBpmDataKotlin.customVariable
import io.holunda.camunda.bpm.data.CamundaBpmDataKotlin.stringVariable
import io.holunda.camunda.bpm.example.axon.ReservationProcessing.Variables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import mu.KLogging
import org.camunda.bpm.engine.RepositoryService
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import javax.annotation.PostConstruct
import jakarta.annotation.PostConstruct

/**
* Configuration for correlation.
Expand Down
1 change: 1 addition & 0 deletions example/itest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>


<!-- Axon Starter -->
<dependency>
<groupId>org.axonframework</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Lazy
import org.springframework.test.context.ActiveProfiles
import org.springframework.test.context.junit.jupiter.SpringExtension

Expand All @@ -19,12 +20,12 @@ import org.springframework.test.context.junit.jupiter.SpringExtension
webEnvironment = SpringBootTest.WebEnvironment.MOCK,
properties = [
// axon-1
"correlate.channels.axon-1.type=axon-event",
"correlate.channels.axon-1.enabled=true",
"correlate.channels.axonOne.type=axon-event",
"correlate.channels.axonOne.enabled=true",
// axon-2
"correlate.channels.axon-2.type=axon-event",
"correlate.channels.axon-2.enabled=true",
"correlate.channels.axon-2.beanName=specified-handler-name",
"correlate.channels.axon-2.beanNamePrefix=specified",
// unknown-type
"correlate.channels.unknown-type.type=unknown-type",
"correlate.channels.unknown-type.enabled=true",
Expand All @@ -38,28 +39,28 @@ import org.springframework.test.context.junit.jupiter.SpringExtension
internal class AxonChannelConfigurationIT {

@Autowired
@Lazy
private lateinit var handlers: Map<String, AxonEventMessageHandler>

@Autowired
@Qualifier("specified-handler-name")
@Qualifier("specifiedConverter")
private lateinit var converter: AxonEventMessageHeaderConverter


@Test
fun configures_two_consumers() {

assertThat(handlers).hasSize(2)
assertThat(handlers.keys).containsExactlyInAnyOrder("axon-1-handler", "specified-handler-name")
assertThat(handlers["axon-1-handler"]!!.channelName).isEqualTo("axon-1")
assertThat(handlers["specified-handler-name"]!!.channelName).isEqualTo("axon-2")
assertThat(handlers["specified-handler-name"]!!.axonEventMessageHeaderConverter).isEqualTo(converter)
assertThat(handlers.keys).containsExactlyInAnyOrder("axonOneHandler", "specifiedHandler")
assertThat(handlers["axonOneHandler"]!!.channelName).isEqualTo("axonOne")
assertThat(handlers["specifiedHandler"]!!.channelName).isEqualTo("axon-2")
assertThat(handlers["specifiedHandler"]!!.axonEventMessageHeaderConverter).isEqualTo(converter)
}

@SpringBootApplication(exclude = [BatchCorrelationSchedulerConfiguration::class])
class TestApplication {
@Bean
@Qualifier("specified-handler-name")
fun qualifiedConverter(): AxonEventMessageHeaderConverter = mock()
@Bean("specifiedConverter")
fun doesNotMatter(): AxonEventMessageHeaderConverter = mock()

@Bean
fun singleMessageCorrelationStrategy(): SingleMessageCorrelationStrategy = mock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,24 @@ import org.springframework.test.context.junit.jupiter.SpringExtension
webEnvironment = SpringBootTest.WebEnvironment.MOCK,
properties = [
// kafka-1
"correlate.channels.kafka-1.type=stream",
"correlate.channels.kafka-1.enabled=true",
"correlate.channels.kafkaOne.type=stream",
"correlate.channels.kafkaOne.enabled=true",
// kafka-2
"correlate.channels.kafka-2.type=stream",
"correlate.channels.kafka-2.enabled=true",
"correlate.channels.kafka-2.beanName=specified-consumer-name",
"correlate.channels.kafkaTwo.type=stream",
"correlate.channels.kafkaTwo.enabled=true",
"correlate.channels.kafkaTwo.beanNamePrefix=specifiedName",
// unknown-type
"correlate.channels.unknown-type.type=unknown-type",
"correlate.channels.unknown-type.enabled=true",
"correlate.channels.unknownType.type=unknown-type",
"correlate.channels.unknownType.enabled=true",
// disabled
"correlate.channels.disabled.type=stream",
"correlate.channels.disabled.enabled=false",

// function declaration
"spring.cloud.stream.function.definition=kafka-1-consumer; specified-consumer-name",
"spring.cloud.stream.function.definition=kafkaOneConsumer; specifiedNameConsumer",
// bindings
"spring.cloud.stream.function.bindings.kafka-1-consumer-in-0=correlate-ingress-binding-1",
"spring.cloud.stream.function.bindings.specified-consumer-name-in-0=correlate-ingress-binding-1",
"spring.cloud.stream.function.bindings.kafkaOneConsumer-in-0=correlate-ingress-binding-1",
"spring.cloud.stream.function.bindings.specifiedNameConsumer-in-0=correlate-ingress-binding-1",

]
)
@ExtendWith(SpringExtension::class)
Expand All @@ -49,27 +49,26 @@ internal class SpringCloudStreamChannelConfigurationIT {
private lateinit var consumers: Map<String, StreamByteMessageConsumer>

@Autowired
@Qualifier("specified-consumer-name")
private lateinit var converter: StreamChannelMessageHeaderConverter

@Qualifier("specifiedNameConverter")
private lateinit var converter: StreamChannelMessageHeaderConverter // this converter will be picked up because of the name of the field

@Test
fun configures_two_consumers() {

assertThat(consumers).hasSize(2)
assertThat(consumers.keys).containsExactlyInAnyOrder("kafka-1-consumer", "specified-consumer-name")
assertThat(consumers["kafka-1-consumer"]!!.channelName).isEqualTo("kafka-1")
assertThat(consumers["specified-consumer-name"]!!.channelName).isEqualTo("kafka-2")
assertThat(consumers["specified-consumer-name"]!!.streamChannelMessageHeaderConverter).isEqualTo(converter)
assertThat(consumers.keys).containsExactlyInAnyOrder("kafkaOneConsumer", "specifiedNameConsumer")
assertThat(consumers["kafkaOneConsumer"]!!.channelName).isEqualTo("kafkaOne")
assertThat(consumers["specifiedNameConsumer"]!!.channelName).isEqualTo("kafkaTwo")
assertThat(consumers["specifiedNameConsumer"]!!.streamChannelMessageHeaderConverter).isEqualTo(converter)
}

@SpringBootApplication(exclude = [BatchCorrelationSchedulerConfiguration::class])
class TestApplication {
@Bean
@Qualifier("specified-consumer-name")

@Bean("specifiedNameConverter")
fun qualifiedConverter(): StreamChannelMessageHeaderConverter = mock()

@Bean
fun singleMessageCorrelationStrategy(): SingleMessageCorrelationStrategy = mock()
}
}
}
9 changes: 9 additions & 0 deletions example/itest/src/test/resources/application-axon-event.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
spring:
autoconfigure:
exclude:
- org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
- org.springframework.cloud.stream.config.ChannelsEndpointAutoConfiguration
- org.springframework.cloud.stream.config.BindingsEndpointAutoConfiguration
- org.springframework.cloud.stream.config.BindingServiceConfiguration
- org.springframework.cloud.stream.binder.kafka.config.ExtendedBindingHandlerMappingsProviderConfiguration
- org.springframework.cloud.stream.function.FunctionConfiguration
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ spring:
brokers: PLAINTEXT://localhost:59092
configuration:
security.protocol: PLAINTEXT

# disabled because of warning in log, not needed?
# consumer-properties:
# key-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
Expand Down
8 changes: 1 addition & 7 deletions example/itest/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,9 @@ spring:
jpa:
open-in-view: true
show-sql: false
autoconfigure:
exclude:
- org.org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
- org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration
- org.springframework.cloud.stream.binder.kafka.streams.endpoint.KafkaStreamsTopologyEndpointAutoConfiguration
- org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration


axon:
disable-axoniq-console-message: true
axonserver:
enabled: false

Expand Down
9 changes: 5 additions & 4 deletions example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
<packaging>pom</packaging>

<properties>
<spin.version>1.15.0</spin.version>
<spring-cloud.version>2021.0.5</spring-cloud.version>
<axon.framework.version>4.5.12</axon.framework.version>
<spin.version>1.22.0</spin.version>
<spring-cloud.version>2022.0.4</spring-cloud.version>
<axon.framework.version>4.9.0</axon.framework.version>
</properties>

<modules>
Expand Down Expand Up @@ -60,12 +60,13 @@
</dependency>
<!-- FIXME: Remove as soon as spring fixes those issues -->
<!-- https://github.com/spring-cloud/spring-cloud-function/issues/925 -->
<!--
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-context</artifactId>
<version>3.2.5</version>
</dependency>

-->

<!-- Axon Framework -->
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion example/spring-cloud/reservation-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<dependency>
<groupId>io.holunda</groupId>
<artifactId>camunda-platform-7-autologin</artifactId>
<version>0.0.1</version>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>org.camunda.bpm.springboot</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.holunda.camunda.bpm.example.kafka

import io.holunda.camunda.bpm.data.CamundaBpmData.builder
import io.holunda.camunda.bpm.data.CamundaBpmData.longVariable
import io.holunda.camunda.bpm.data.CamundaBpmDataKotlin.longVariable
import io.holunda.camunda.bpm.data.CamundaBpmDataKotlin.customVariable
import io.holunda.camunda.bpm.data.CamundaBpmDataKotlin.stringVariable
import io.holunda.camunda.bpm.example.common.domain.ReservationReceivedEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import java.util.*
import javax.annotation.PostConstruct
import jakarta.annotation.PostConstruct

/**
* Starts the app.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.camunda.bpm.engine.RepositoryService
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Profile
import javax.annotation.PostConstruct
import jakarta.annotation.PostConstruct

/**
* Configuration to use the library for correlation.
Expand Down
2 changes: 1 addition & 1 deletion extension/axon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<name>Extension / Ingress Adapter Axon Framework</name>

<properties>
<axon-framework.version>4.5.12</axon-framework.version>
<axon-framework.version>4.9.0</axon-framework.version>
</properties>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,32 @@ import io.holunda.camunda.bpm.correlate.ingress.ChannelMessageAcceptorConfigurat
import io.holunda.camunda.bpm.correlate.ingress.IngressMetrics
import io.holunda.camunda.bpm.correlate.persist.encoding.PayloadDecoder
import org.axonframework.springboot.autoconfig.AxonAutoConfiguration
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.autoconfigure.AutoConfigureAfter
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

/**
* Axon Framework Channel configuration-
* Axon Framework Channel configuration.
*/
@Configuration
@AutoConfigureAfter(AxonAutoConfiguration::class, ChannelMessageAcceptorConfiguration::class)
class AxonChannelConfiguration {

companion object {
const val CHANNEL_TYPE = "axon-event"

const val PROPERTY_CHANNEL_PAYLOAD_ENCODING = "payload-encoding"
const val DEFAULT_MESSAGE_HEADER_CONVERTER_NAME = "axonEventMessageHeaderConverter"
}

/**
* Channel header extractor.
*/
@ConditionalOnMissingBean
@Bean
fun axonEventHeaderExtractor(): AxonEventMessageHeaderConverter = DefaultAxonEventMessageHeaderConverter()
@Bean(DEFAULT_MESSAGE_HEADER_CONVERTER_NAME)
fun axonEventMessageHeaderConverter(): AxonEventMessageHeaderConverter = DefaultAxonEventMessageHeaderConverter()

/**
* Configuration of named channels.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.holunda.camunda.bpm.correlate.ingress.ChannelConfigurationProperties
import io.holunda.camunda.bpm.correlate.ingress.ChannelMessageAcceptor
import io.holunda.camunda.bpm.correlate.ingress.IngressMetrics
import io.holunda.camunda.bpm.correlate.ingress.axon.AxonChannelConfiguration.Companion.CHANNEL_TYPE
import io.holunda.camunda.bpm.correlate.ingress.axon.AxonChannelConfiguration.Companion.DEFAULT_MESSAGE_HEADER_CONVERTER_NAME
import io.holunda.camunda.bpm.correlate.ingress.axon.AxonChannelConfiguration.Companion.PROPERTY_CHANNEL_PAYLOAD_ENCODING
import io.holunda.camunda.bpm.correlate.persist.encoding.PayloadDecoder
import io.holunda.camunda.bpm.correlate.util.getQualifiedBeanWithFallback
Expand Down Expand Up @@ -40,23 +41,31 @@ class AxonChannelProxyFactory(
override fun afterPropertiesSet() {
if (this::applicationContext.isInitialized) {
logger.debug { "[Camunda CORRELATE] Creating channel consumers for Axon Event Bus: ${axonEventConfigurations.keys.joinToString(", ")}." }
var refreshRequired = false
axonEventConfigurations.forEach { (name, config) ->

val encoding: String = requireNotNull( getEncoding(config) ) { "Channel encoding is required, please set either globally or for channel." }
val encoder = requireNotNull(payloadDecoders.find { it.supports(encoding) }) { "Could not find decoder for configured message encoding '$encoding'." }

val handlerName = config.beanName ?: "$name-handler"
// lookup named converter or take the default one
val converterName = (config.beanNamePrefix ?: name) + "Converter"
val converter: AxonEventMessageHeaderConverter = applicationContext.getQualifiedBeanWithFallback(converterName, DEFAULT_MESSAGE_HEADER_CONVERTER_NAME)

// lookup consumer or create one
val handlerName = (config.beanNamePrefix ?: name) + "Handler"
if (!applicationContext.containsBean(handlerName)) {
// the channel is not configured yet.
// the channel handler is not configured yet.
val handler = AxonEventMessageHandler(
messageAcceptor = channelMessageAcceptor,
metrics = metrics,
axonEventMessageHeaderConverter = applicationContext.getQualifiedBeanWithFallback(name),
axonEventMessageHeaderConverter = converter,
encoder = encoder,
channelName = name
)
applicationContext.registerBean(handlerName, AxonEventMessageHandler::class.java, Supplier { handler })
logger.info { "[Camunda CORRELATE] Registered AxonEventMessageHandler for channel '$name' named '$handlerName'." }
} else {
logger.info { "[Camunda CORRELATE] Found a bean '$handlerName', skipping construction." }
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions extension/axon/src/main/resources/META-INF/spring.factories

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
io.holunda.camunda.bpm.correlate.ingress.axon.AxonChannelConfiguration
io.holunda.camunda.bpm.correlate.ingress.axon.AxonHandlerConfiguration
Loading