Skip to content

Commit

Permalink
Add BiFunction support to spring-cloud-stream-plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
LeovR committed Nov 20, 2024
1 parent 972784f commit 3f0c3e3
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -30,6 +31,16 @@ public Function<ExamplePayloadDto, AnotherPayloadDto> process() {
};
}

@Bean
public BiFunction<ExamplePayloadDto, Map<String, Object>, AnotherPayloadDto> biProcess() {
return (input, headers) -> {
log.info("Received new message in bifunction-topic: {}. Headers: {}", input, headers);
AnotherPayloadDto output = new AnotherPayloadDto("foo", input);
log.info("Publishing output: {}", output);
return output;
};
}

@Bean
public Consumer<AnotherPayloadDto> consumerMethod() {
return input -> log.info("Received new message in another-topic: {}", input.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ spring.cloud.stream.default-binder=kafka
spring.cloud.stream.binders.kafka.type=kafka
spring.cloud.stream.binders.kafka.environment.spring.kafka.bootstrap-servers=${spring.kafka.bootstrap-servers}
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.function.definition=process;consumerMethod;consumerClass;googlePubSubConsumerMethod;biConsumerMethod
spring.cloud.function.definition=process;consumerMethod;consumerClass;googlePubSubConsumerMethod;biConsumerMethod;biProcess

spring.cloud.stream.bindings.process-in-0.destination=example-topic
spring.cloud.stream.bindings.process-in-0.group=springwolf
spring.cloud.stream.bindings.process-out-0.destination=another-topic

spring.cloud.stream.bindings.biProcess-in-0.destination=bifunction-topic
spring.cloud.stream.bindings.biProcess-in-0.group=springwolf
spring.cloud.stream.bindings.biProcess-out-0.destination=bifunction-output-topic

spring.cloud.stream.bindings.consumerMethod-in-0.destination=consumer-topic
spring.cloud.stream.bindings.consumerMethod-in-0.group=springwolf

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@
"kafka": { }
}
},
"bifunction-output-topic": {
"address": "bifunction-output-topic",
"messages": {
"AnotherPayloadDto": {
"$ref": "#/components/messages/AnotherPayloadDto"
}
},
"bindings": {
"kafka": { }
}
},
"bifunction-topic": {
"address": "bifunction-topic",
"messages": {
"ExamplePayloadDto": {
"$ref": "#/components/messages/ExamplePayloadDto"
}
},
"bindings": {
"kafka": { }
}
},
"consumer-class-topic": {
"address": "consumer-class-topic",
"messages": {
Expand Down Expand Up @@ -304,6 +326,36 @@
}
]
},
"bifunction-output-topic_subscribe_biProcess": {
"action": "send",
"channel": {
"$ref": "#/channels/bifunction-output-topic"
},
"description": "Auto-generated description",
"bindings": {
"kafka": { }
},
"messages": [
{
"$ref": "#/channels/bifunction-output-topic/messages/AnotherPayloadDto"
}
]
},
"bifunction-topic_publish_biProcess": {
"action": "receive",
"channel": {
"$ref": "#/channels/bifunction-topic"
},
"description": "Auto-generated description",
"bindings": {
"kafka": { }
},
"messages": [
{
"$ref": "#/channels/bifunction-topic/messages/ExamplePayloadDto"
}
]
},
"consumer-class-topic_publish_ConsumerClass": {
"action": "receive",
"channel": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -42,6 +43,13 @@ public Set<FunctionalChannelBeanData> build(AnnotatedElement element) {
return Set.of(ofConsumer(element, inputType), ofSupplier(element, outputType));
}

if (BiFunction.class.isAssignableFrom(type)) {
Type inputType = getTypeGenerics(element).get(0);
Type outputType = getTypeGenerics(element).get(2);

return Set.of(ofConsumer(element, inputType), ofSupplier(element, outputType));
}

return Collections.emptySet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -340,6 +341,90 @@ void testFunctionBinding() {
Map.entry("test-out-topic_subscribe_testFunction", subscribeOperation));
}

@Test
void testBiFunctionBinding() {
// Given a binding "spring.cloud.stream.bindings.testFunction-in-0.destination=test-in-topic"
// And a binding "spring.cloud.stream.bindings.testFunction-out-0.destination=test-output-topic"
String inputTopicName = "test-in-topic";
BindingProperties testBiFunctionInBinding = new BindingProperties();
testBiFunctionInBinding.setDestination(inputTopicName);

String outputTopicName = "test-out-topic";
BindingProperties testBiFunctionOutBinding = new BindingProperties();
testBiFunctionOutBinding.setDestination(outputTopicName);
when(bindingServiceProperties.getBindings())
.thenReturn(Map.of(
"testBiFunction-in-0", testBiFunctionInBinding,
"testBiFunction-out-0", testBiFunctionOutBinding));

// When scan is called
Map<String, ChannelObject> actualChannels = channelsScanner.scan();
Map<String, Operation> actualOperations = operationsScanner.scan();

// Then the returned channels contain a publish ChannelItem and a subscribe ChannelItem
MessageObject subscribeMessage = MessageObject.builder()
.name(Integer.class.getName())
.title(Integer.class.getSimpleName())
.payload(MessagePayload.of(MultiFormatSchema.builder()
.schema(SchemaReference.fromSchema(Number.class.getSimpleName()))
.build()))
.headers(MessageHeaders.of(
MessageReference.toSchema(AsyncHeadersNotDocumented.NOT_DOCUMENTED.getTitle())))
.bindings(Map.of("kafka", new EmptyMessageBinding()))
.build();

Operation subscribeOperation = Operation.builder()
.bindings(operationBinding)
.action(OperationAction.SEND)
.bindings(operationBinding)
.description("Auto-generated description")
.channel(ChannelReference.fromChannel(outputTopicName))
.messages(List.of(MessageReference.toChannelMessage(outputTopicName, subscribeMessage)))
.build();

ChannelObject subscribeChannel = ChannelObject.builder()
.channelId(outputTopicName)
.address(outputTopicName)
.bindings(channelBinding)
.messages(
Map.of(subscribeMessage.getMessageId(), MessageReference.toComponentMessage(subscribeMessage)))
.build();

MessageObject publishMessage = MessageObject.builder()
.name(String.class.getName())
.title(String.class.getSimpleName())
.payload(MessagePayload.of(MultiFormatSchema.builder()
.schema(SchemaReference.fromSchema(String.class.getName()))
.build()))
.headers(MessageHeaders.of(
MessageReference.toSchema(AsyncHeadersNotDocumented.NOT_DOCUMENTED.getTitle())))
.bindings(Map.of("kafka", new EmptyMessageBinding()))
.build();

Operation publishOperation = Operation.builder()
.bindings(operationBinding)
.action(OperationAction.RECEIVE)
.bindings(operationBinding)
.description("Auto-generated description")
.channel(ChannelReference.fromChannel(inputTopicName))
.messages(List.of(MessageReference.toChannelMessage(inputTopicName, publishMessage)))
.build();

ChannelObject publishChannel = ChannelObject.builder()
.channelId(inputTopicName)
.address(inputTopicName)
.bindings(channelBinding)
.messages(Map.of(publishMessage.getMessageId(), MessageReference.toComponentMessage(publishMessage)))
.build();

assertThat(actualChannels)
.contains(Map.entry(inputTopicName, publishChannel), Map.entry(outputTopicName, subscribeChannel));
assertThat(actualOperations)
.contains(
Map.entry("test-in-topic_publish_testBiFunction", publishOperation),
Map.entry("test-out-topic_subscribe_testBiFunction", subscribeOperation));
}

@Test
void testKStreamFunctionBinding() {
// Given a binding "spring.cloud.stream.bindings.kStreamTestFunction-in-0.destination=test-in-topic"
Expand Down Expand Up @@ -530,6 +615,11 @@ public Function<String, Integer> testFunction() {
return s -> 1;
}

@Bean
public BiFunction<String, Map<String, Object>, Integer> testBiFunction() {
return (value, headers) -> 1;
}

@Bean
public Function<KStream<Void, String>, KStream<Void, Integer>> kStreamTestFunction() {
return stream -> stream.mapValues(s -> 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -64,6 +67,25 @@ private Consumer<String> consumerBean() {
}
}

@Nested
class BiConsumerBean {
@Test
void testBiConsumerBean() throws NoSuchMethodException {
Method method = getMethod(this.getClass(), "biConsumerBean");

Set<FunctionalChannelBeanData> data = functionalChannelBeanBuilder.build(method);

Assertions.assertThat(data)
.containsExactly(new FunctionalChannelBeanData(
"biConsumerBean", method, String.class, CONSUMER, "biConsumerBean-in-0"));
}

@Bean
private BiConsumer<String, Map<String, Object>> biConsumerBean() {
return (input, headers) -> System.out.println(input);
}
}

@Nested
class SupplierBean {
@Test
Expand Down Expand Up @@ -105,6 +127,28 @@ private Function<String, Integer> functionBean() {
}
}

@Nested
class BiFunctionBean {
@Test
void testBiFunctionBean() throws NoSuchMethodException {
Method method = getMethod(this.getClass(), "biFunctionBean");

Set<FunctionalChannelBeanData> data = functionalChannelBeanBuilder.build(method);

Assertions.assertThat(data)
.containsExactlyInAnyOrder(
new FunctionalChannelBeanData(
"biFunctionBean", method, String.class, CONSUMER, "biFunctionBean-in-0"),
new FunctionalChannelBeanData(
"biFunctionBean", method, Integer.class, SUPPLIER, "biFunctionBean-out-0"));
}

@Bean
private BiFunction<String, Map<String, Object>, Integer> biFunctionBean() {
return (s, headers) -> 1;
}
}

@Nested
class ConsumerBeanWithGenericPayload {

Expand Down

0 comments on commit 3f0c3e3

Please sign in to comment.