Skip to content

Commit

Permalink
#59 - Add support for multiple producers with the same channel name
Browse files Browse the repository at this point in the history
  • Loading branch information
stavshamir committed May 6, 2022
1 parent 20d6357 commit f1cc2cc
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.asyncapi.v2.binding.OperationBinding;
import com.asyncapi.v2.model.channel.ChannelItem;
import com.asyncapi.v2.model.channel.operation.Operation;
import com.google.common.collect.ImmutableMap;
import io.github.stavshamir.springwolf.asyncapi.types.ProducerData;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference;
Expand All @@ -12,9 +13,12 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.stream.Collectors.toMap;
import static io.github.stavshamir.springwolf.asyncapi.Constants.ONE_OF;
import static java.util.stream.Collectors.*;

@Slf4j
@RequiredArgsConstructor
Expand All @@ -26,9 +30,12 @@ public class ProducerChannelScanner implements ChannelsScanner {

@Override
public Map<String, ChannelItem> scan() {
return docket.getProducers().stream()
Map<String, List<ProducerData>> producerDataGroupedByChannelName = docket.getProducers().stream()
.filter(this::allFieldsAreNonNull)
.collect(toMap(ProducerData::getChannelName, this::buildChannel));
.collect(groupingBy(ProducerData::getChannelName));

return producerDataGroupedByChannelName.entrySet().stream()
.collect(toMap(Map.Entry::getKey, entry -> buildChannel(entry.getValue())));
}

private boolean allFieldsAreNonNull(ProducerData producerData) {
Expand All @@ -43,26 +50,40 @@ private boolean allFieldsAreNonNull(ProducerData producerData) {
return allNonNull;
}

private ChannelItem buildChannel(ProducerData producerData) {
Class<?> payloadType = producerData.getPayloadType();
Map<String, ? extends OperationBinding> operationBinding = producerData.getBinding();

String modelName = schemasService.register(payloadType);

Message message = Message.builder()
.name(payloadType.getName())
.title(modelName)
.payload(PayloadReference.fromModelName(modelName))
.build();
private ChannelItem buildChannel(List<ProducerData> producerDataList) {
// All bindings in the group are assumed to be the same
// AsyncApi does not support multiple bindings on a single channel
Map<String, ? extends OperationBinding> binding = producerDataList.get(0).getBinding();

Operation operation = Operation.builder()
.message(message)
.bindings(operationBinding)
.message(getMessageObject(producerDataList))
.bindings(binding)
.build();

return ChannelItem.builder()
.subscribe(operation)
.build();
}

private Object getMessageObject(List<ProducerData> producerDataList) {
Set<Message> messages = producerDataList.stream()
.map(this::buildMessage)
.collect(toSet());

return messages.size() == 1
? messages.toArray()[0]
: ImmutableMap.of(ONE_OF, messages);
}

private Message buildMessage(ProducerData producerData) {
Class<?> payloadType = producerData.getPayloadType();
String modelName = schemasService.register(payloadType);

return Message.builder()
.name(payloadType.getName())
.title(modelName)
.payload(PayloadReference.fromModelName(modelName))
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import com.asyncapi.v2.binding.kafka.KafkaOperationBinding;
import com.asyncapi.v2.model.channel.ChannelItem;
import com.asyncapi.v2.model.channel.operation.Operation;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.github.stavshamir.springwolf.asyncapi.types.ProducerData;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.Message;
import io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
import io.github.stavshamir.springwolf.schemas.DefaultSchemasService;
import org.junit.Test;
Expand All @@ -15,7 +19,9 @@
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Map;
import java.util.Set;

import static io.github.stavshamir.springwolf.asyncapi.Constants.ONE_OF;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -47,6 +53,22 @@ public void allFieldsProducerData() {
// Then the channel should be created correctly
assertThat(producerChannels)
.containsKey(channelName);

Operation operation = Operation.builder()
.bindings(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.message(Message.builder()
.name(ExamplePayloadDto.class.getName())
.title(ExamplePayloadDto.class.getSimpleName())
.payload(PayloadReference.fromModelName(ExamplePayloadDto.class.getSimpleName()))
.build())
.build();

ChannelItem expectedChannel = ChannelItem.builder()
.subscribe(operation)
.build();

assertThat(producerChannels.get(channelName))
.isEqualTo(expectedChannel);
}

@Test
Expand All @@ -65,8 +87,66 @@ public void missingFieldProducerData() {
// Then the channel is not created, and no exception is thrown
assertThat(producerChannels).isEmpty();
}

@Test
public void multipleProducersForSameTopic() {
// Given a multiple ProducerData objects for the same topic
String channelName = "example-producer-topic";

ProducerData producerData1 = ProducerData.builder()
.channelName(channelName)
.binding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.payloadType(ExamplePayloadDto.class)
.build();

ProducerData producerData2 = ProducerData.builder()
.channelName(channelName)
.binding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.payloadType(AnotherExamplePayloadDto.class)
.build();

when(asyncApiDocket.getProducers()).thenReturn(ImmutableList.of(producerData1, producerData2));

// When scanning for producers
Map<String, ChannelItem> producerChannels = scanner.scan();

// Then one channel is created for the ProducerData objects with multiple messages
assertThat(producerChannels)
.hasSize(1)
.containsKey(channelName);

Set<Message> messages = ImmutableSet.of(
Message.builder()
.name(ExamplePayloadDto.class.getName())
.title(ExamplePayloadDto.class.getSimpleName())
.payload(PayloadReference.fromModelName(ExamplePayloadDto.class.getSimpleName()))
.build(),
Message.builder()
.name(AnotherExamplePayloadDto.class.getName())
.title(AnotherExamplePayloadDto.class.getSimpleName())
.payload(PayloadReference.fromModelName(AnotherExamplePayloadDto.class.getSimpleName()))
.build()
);

Operation operation = Operation.builder()
.bindings(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.message(ImmutableMap.of(ONE_OF, messages))
.build();

ChannelItem expectedChannel = ChannelItem.builder()
.subscribe(operation)
.build();

assertThat(producerChannels.get(channelName))
.isEqualTo(expectedChannel);
}

static class ExamplePayloadDto {
private String foo;
}

static class AnotherExamplePayloadDto {
private String bar;
}

}
4 changes: 2 additions & 2 deletions springwolf-examples/springwolf-kafka-example/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
}

sourceCompatibility = '1.8'
version '0.4.0'
version '0.5.0'


repositories {
Expand All @@ -23,7 +23,7 @@ repositories {

dependencies {
implementation project(":springwolf-plugins:springwolf-kafka-plugin")
runtimeOnly 'io.github.springwolf:springwolf-ui:0.3.1'
runtimeOnly 'io.github.springwolf:springwolf-ui:0.4.0'

implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
services:
app:
image: stavshamir/springwolf-kafka-example:0.4.0
image: stavshamir/springwolf-kafka-example:0.5.0
links:
- kafka
environment:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
import com.asyncapi.v2.model.server.Server;
import com.google.common.collect.ImmutableMap;
import io.github.stavshamir.springwolf.asyncapi.types.ProducerData;
import io.github.stavshamir.springwolf.example.dtos.ExamplePayloadDto;
import io.github.stavshamir.springwolf.configuration.AsyncApiDocket;
import io.github.stavshamir.springwolf.configuration.EnableAsyncApi;
import io.github.stavshamir.springwolf.example.dtos.AnotherPayloadDto;
import io.github.stavshamir.springwolf.example.dtos.ExamplePayloadDto;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import static io.github.stavshamir.springwolf.example.configuration.KafkaConfiguration.PRODUCER_TOPIC;

@Configuration
@EnableAsyncApi
public class AsyncApiConfiguration {
Expand All @@ -29,20 +32,24 @@ public AsyncApiDocket asyncApiDocket() {
.title("Springwolf example project")
.build();

ProducerData exampleProducerData = ProducerData.builder()
.channelName("example-producer-topic")
.binding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.payloadType(ExamplePayloadDto.class)
.build();
ProducerData exampleProducerData = buildKafkaProducerData(PRODUCER_TOPIC, ExamplePayloadDto.class);
ProducerData anotherProducerData = buildKafkaProducerData(PRODUCER_TOPIC, AnotherPayloadDto.class);

return AsyncApiDocket.builder()
.basePackage("io.github.stavshamir.springwolf.example.consumers")
.info(info)
.server("kafka", Server.builder().protocol("kafka").url(BOOTSTRAP_SERVERS).build())
.producer(exampleProducerData)
.producer(anotherProducerData)
.build();
}


private ProducerData buildKafkaProducerData(String topic, Class<?> payload) {
return ProducerData.builder()
.channelName(topic)
.binding(ImmutableMap.of("kafka", new KafkaOperationBinding()))
.payloadType(payload)
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
@EnableKafka
public class KafkaConfiguration {

public final static String PRODUCER_TOPIC = "example-producer-topic";
private final String BOOTSTRAP_SERVERS;

public KafkaConfiguration(@Value("${kafka.bootstrap.servers}") String bootstrapServers) {
Expand Down Expand Up @@ -66,15 +67,22 @@ public ConcurrentKafkaListenerContainerFactory<String, AnotherPayloadDto> anothe
return containerFactory;
}

private Map<String, Object> producerConfiguration() {
return ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class
);
}

@Bean
public KafkaTemplate<String, ExamplePayloadDto> examplePayloadKafkaTemplate() {
Map<String, Object> configuration = ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class
);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfiguration()));
}

return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configuration));
@Bean
public KafkaTemplate<String, AnotherPayloadDto> anotherPayloadKafkaTemplate() {
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfiguration()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.github.stavshamir.springwolf.example.producers;

import io.github.stavshamir.springwolf.example.dtos.AnotherPayloadDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import static io.github.stavshamir.springwolf.example.configuration.KafkaConfiguration.PRODUCER_TOPIC;

@Component
public class AnotherProducer {

@Autowired
private KafkaTemplate<String, AnotherPayloadDto> kafkaTemplate;

public void sendMessage(AnotherPayloadDto msg) {
kafkaTemplate.send(PRODUCER_TOPIC, msg);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import static io.github.stavshamir.springwolf.example.configuration.KafkaConfiguration.PRODUCER_TOPIC;

@Component
public class ExampleProducer {

@Autowired
private KafkaTemplate<String, ExamplePayloadDto> kafkaTemplate;

public void sendMessage(ExamplePayloadDto msg) {
kafkaTemplate.send("example-producer-topic", msg);
kafkaTemplate.send(PRODUCER_TOPIC, msg);
}

}
Loading

0 comments on commit f1cc2cc

Please sign in to comment.