Skip to content

Commit

Permalink
feat(kafka): prepare to use different kafka templates per topic durin…
Browse files Browse the repository at this point in the history
…g publishing (#518)
  • Loading branch information
timonback authored Jan 5, 2024
1 parent def8ee5 commit 56675ae
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfKafkaController;
import io.github.stavshamir.springwolf.configuration.properties.SpringwolfKafkaConfigProperties;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateFactory;
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateFromProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
Expand All @@ -30,14 +30,15 @@ public SpringwolfKafkaController springwolfKafkaController(

@Bean
@ConditionalOnMissingBean
public SpringwolfKafkaProducer springwolfKafkaProducer(SpringwolfKafkaTemplateFactory producerTemplateFactory) {
return new SpringwolfKafkaProducer(producerTemplateFactory.buildKafkaTemplate());
public SpringwolfKafkaProducer springwolfKafkaProducer(
SpringwolfKafkaTemplateFromProperties springwolfKafkaTemplateFromProperties) {
return new SpringwolfKafkaProducer(springwolfKafkaTemplateFromProperties);
}

@Bean
@ConditionalOnMissingBean
public SpringwolfKafkaTemplateFactory springwolfKafkaTemplateFactory(
public SpringwolfKafkaTemplateFromProperties springwolfKafkaTemplateFromProperties(
SpringwolfKafkaConfigProperties springwolfKafkaConfigProperties) {
return new SpringwolfKafkaTemplateFactory(springwolfKafkaConfigProperties);
return new SpringwolfKafkaTemplateFromProperties(springwolfKafkaConfigProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,22 @@
@RequiredArgsConstructor
public class SpringwolfKafkaProducer {

private final Optional<KafkaTemplate<Object, Object>> kafkaTemplate;
private final SpringwolfKafkaTemplateProvider kafkaTemplateProvider;

public boolean isEnabled() {
return kafkaTemplate.isPresent();
return kafkaTemplateProvider.isPresent();
}

public void send(String topic, String key, Map<String, String> headers, Object payload) {
Optional<KafkaTemplate<Object, Object>> kafkaTemplate = kafkaTemplateProvider.get(topic);
if (kafkaTemplate.isPresent()) {
kafkaTemplate
.get()
.send(buildProducerRecord(topic, key, headers, payload))
.toCompletableFuture()
.join();
} else {
log.warn("Kafka producer is not configured");
log.warn("Kafka producer for topic %s is not configured".formatted(topic));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@

@RequiredArgsConstructor
@ConditionalOnBean(value = SpringwolfKafkaProducerConfiguration.class)
public class SpringwolfKafkaTemplateFactory {
public class SpringwolfKafkaTemplateFromProperties implements SpringwolfKafkaTemplateProvider {

private final SpringwolfKafkaConfigProperties springWolfKafkaConfigProperties;

public Optional<KafkaTemplate<Object, Object>> buildKafkaTemplate() {
Optional<KafkaTemplate<Object, Object>> kafkaTemplate = Optional.empty();
private final Optional<KafkaTemplate<Object, Object>> kafkaTemplate;

public SpringwolfKafkaTemplateFromProperties(SpringwolfKafkaConfigProperties springWolfKafkaConfigProperties) {
if (springWolfKafkaConfigProperties.getPublishing() != null
&& springWolfKafkaConfigProperties.getPublishing().getProducer() != null) {
Map<String, Object> producerProperties = springWolfKafkaConfigProperties
Expand All @@ -30,8 +28,18 @@ public Optional<KafkaTemplate<Object, Object>> buildKafkaTemplate() {
DefaultKafkaProducerFactory<Object, Object> producerFactory =
new DefaultKafkaProducerFactory<>(producerProperties);
kafkaTemplate = Optional.of(new KafkaTemplate<>(producerFactory));
} else {
kafkaTemplate = Optional.empty();
}
}

@Override
public boolean isPresent() {
return kafkaTemplate.isPresent();
}

@Override
public Optional<KafkaTemplate<Object, Object>> get(String topic) {
return kafkaTemplate;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.stavshamir.springwolf.producer;

import org.springframework.kafka.core.KafkaTemplate;

import java.util.Optional;

public interface SpringwolfKafkaTemplateProvider {
/**
* Check if publishing in general is possible
*
* @return true if at least in one case a kafka template is present
*/
boolean isPresent();

/**
* Returns the matching kafka template for the topic
*/
Optional<KafkaTemplate<Object, Object>> get(String topic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CompletableFuture;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -32,19 +33,23 @@ class SpringwolfKafkaProducerTest {
private SpringwolfKafkaProducer springwolfKafkaProducer;

@Mock
KafkaTemplate<Object, Object> kafkaTemplate;
private SpringwolfKafkaTemplateProvider kafkaTemplateProvider;

@Mock
private KafkaTemplate<Object, Object> kafkaTemplate;

@Captor
private ArgumentCaptor<ProducerRecord<Object, Object>> recordArgumentCaptor;

@BeforeEach
void setUp() {
springwolfKafkaProducer = new SpringwolfKafkaProducer(Optional.of(kafkaTemplate));
springwolfKafkaProducer = new SpringwolfKafkaProducer(kafkaTemplateProvider);
}

@Test
void testSpringwolfKafkaProducerIsNotEnabledWhenThereIsNoKafkaTemplateConfigured() {
Optional<KafkaTemplate<Object, Object>> kafkaTemplateMock = Optional.empty();
SpringwolfKafkaTemplateProvider kafkaTemplateMock = mock();
when(kafkaTemplateMock.isPresent()).thenReturn(false);

springwolfKafkaProducer = new SpringwolfKafkaProducer(kafkaTemplateMock);

Expand All @@ -54,15 +59,20 @@ void testSpringwolfKafkaProducerIsNotEnabledWhenThereIsNoKafkaTemplateConfigured
@Test
@SuppressWarnings("unchecked")
void testSendingKafkaMessageWithoutHeaders() {
// given
when(kafkaTemplateProvider.get(any())).thenReturn(Optional.of(kafkaTemplate));

CompletableFuture<SendResult<Object, Object>> future = new CompletableFuture<>();
when(kafkaTemplate.send(ArgumentMatchers.<ProducerRecord<Object, Object>>any()))
.thenReturn(future);
future.complete(mock(SendResult.class));

Map<String, Object> payload = Collections.singletonMap("some", "field");

// when
springwolfKafkaProducer.send("test-topic", null, null, payload);

// then
verify(kafkaTemplate).send(recordArgumentCaptor.capture());

ProducerRecord<Object, Object> capturedRecord = recordArgumentCaptor.getValue();
Expand All @@ -78,6 +88,9 @@ void testSendingKafkaMessageWithoutHeaders() {
@Test
@SuppressWarnings("unchecked")
void testSendingKafkaMessageWithHeaders() {
// given
when(kafkaTemplateProvider.get(any())).thenReturn(Optional.of(kafkaTemplate));

CompletableFuture<SendResult<Object, Object>> future = new CompletableFuture<>();
when(kafkaTemplate.send(ArgumentMatchers.<ProducerRecord<Object, Object>>any()))
.thenReturn(future);
Expand All @@ -86,8 +99,10 @@ void testSendingKafkaMessageWithHeaders() {
Map<String, Object> payload = Collections.singletonMap("some", "field");
Map<String, String> headers = Collections.singletonMap("header-key", "header");

// when
springwolfKafkaProducer.send("test-topic", null, headers, payload);

// then
verify(kafkaTemplate).send(recordArgumentCaptor.capture());

ProducerRecord<Object, Object> capturedRecord = recordArgumentCaptor.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,61 @@
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.ssl.DefaultSslBundleRegistry;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.Map;
import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThat;

class SpringwolfKafkaTemplateFactoryTest {
class SpringwolfKafkaTemplateFromPropertiesTest {

@Test
void kafkaTemplateShouldNotBeCreatedForEmptyProperties() {
// given
SpringwolfKafkaConfigProperties configProperties = new SpringwolfKafkaConfigProperties();

SpringwolfKafkaTemplateFactory templateFactory = new SpringwolfKafkaTemplateFactory(configProperties);
// when
SpringwolfKafkaTemplateFromProperties kafkaTemplateProvider =
new SpringwolfKafkaTemplateFromProperties(configProperties);

Optional<KafkaTemplate<Object, Object>> kafkaTemplate = templateFactory.buildKafkaTemplate();

assertThat(kafkaTemplate).isNotPresent();
// then
assertThat(kafkaTemplateProvider.isPresent()).isFalse();
assertThat(kafkaTemplateProvider.get("topic")).isNotPresent();
}

@Test
void kafkaTemplateShouldNotBeCreatedForEmptyProducerConfiguration() {
// given
SpringwolfKafkaConfigProperties configProperties = new SpringwolfKafkaConfigProperties();
configProperties.setPublishing(new SpringwolfKafkaConfigProperties.Publishing());

SpringwolfKafkaTemplateFactory templateFactory = new SpringwolfKafkaTemplateFactory(configProperties);

Optional<KafkaTemplate<Object, Object>> kafkaTemplate = templateFactory.buildKafkaTemplate();
// when
SpringwolfKafkaTemplateFromProperties kafkaTemplateProvider =
new SpringwolfKafkaTemplateFromProperties(configProperties);

assertThat(kafkaTemplate).isNotPresent();
// then
assertThat(kafkaTemplateProvider.isPresent()).isFalse();
assertThat(kafkaTemplateProvider.get("topic")).isNotPresent();
}

@Test
void kafkaTemplateShouldBeCreatedWithProducerConfiguration() {
// given
SpringwolfKafkaConfigProperties configProperties = new SpringwolfKafkaConfigProperties();
SpringwolfKafkaConfigProperties.Publishing publishing = new SpringwolfKafkaConfigProperties.Publishing();
publishing.setEnabled(true);
publishing.setProducer(new KafkaProperties.Producer());

configProperties.setPublishing(publishing);

SpringwolfKafkaTemplateFactory templateFactory = new SpringwolfKafkaTemplateFactory(configProperties);
// when
SpringwolfKafkaTemplateFromProperties kafkaTemplateProvider =
new SpringwolfKafkaTemplateFromProperties(configProperties);

Optional<KafkaTemplate<Object, Object>> kafkaTemplate = templateFactory.buildKafkaTemplate();
// then
assertThat(kafkaTemplateProvider.isPresent()).isTrue();

assertThat(kafkaTemplate).isPresent();
Map<String, Object> configurationProperties =
kafkaTemplate.get().getProducerFactory().getConfigurationProperties();

kafkaTemplateProvider.get("topic").get().getProducerFactory().getConfigurationProperties();
assertThat(configurationProperties)
.isEqualTo(new KafkaProperties.Producer().buildProperties(new DefaultSslBundleRegistry()));
}
Expand Down

0 comments on commit 56675ae

Please sign in to comment.