diff --git a/springwolf-examples/springwolf-cloud-stream-example/src/main/java/io/github/springwolf/examples/cloudstream/configuration/CloudstreamConfiguration.java b/springwolf-examples/springwolf-cloud-stream-example/src/main/java/io/github/springwolf/examples/cloudstream/configuration/CloudstreamConfiguration.java index 34833b2a6..75a9469d6 100644 --- a/springwolf-examples/springwolf-cloud-stream-example/src/main/java/io/github/springwolf/examples/cloudstream/configuration/CloudstreamConfiguration.java +++ b/springwolf-examples/springwolf-cloud-stream-example/src/main/java/io/github/springwolf/examples/cloudstream/configuration/CloudstreamConfiguration.java @@ -14,6 +14,7 @@ import java.util.Map; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -30,6 +31,16 @@ public Function process() { }; } + @Bean + public BiFunction, AnotherPayloadDto> biProcess() { + return (input, headers) -> { + log.info("Received new message in bifunction-topic: {}. Headers: {}", input, headers); + AnotherPayloadDto output = new AnotherPayloadDto("foo", input); + log.info("Publishing output: {}", output); + return output; + }; + } + @Bean public Consumer consumerMethod() { return input -> log.info("Received new message in another-topic: {}", input.toString()); diff --git a/springwolf-examples/springwolf-cloud-stream-example/src/main/resources/application.properties b/springwolf-examples/springwolf-cloud-stream-example/src/main/resources/application.properties index b0d8277c8..1583f29fc 100644 --- a/springwolf-examples/springwolf-cloud-stream-example/src/main/resources/application.properties +++ b/springwolf-examples/springwolf-cloud-stream-example/src/main/resources/application.properties @@ -10,12 +10,16 @@ spring.cloud.stream.default-binder=kafka spring.cloud.stream.binders.kafka.type=kafka spring.cloud.stream.binders.kafka.environment.spring.kafka.bootstrap-servers=${spring.kafka.bootstrap-servers} spring.cloud.stream.kafka.binder.autoCreateTopics=true -spring.cloud.function.definition=process;consumerMethod;consumerClass;googlePubSubConsumerMethod;biConsumerMethod +spring.cloud.function.definition=process;consumerMethod;consumerClass;googlePubSubConsumerMethod;biConsumerMethod;biProcess spring.cloud.stream.bindings.process-in-0.destination=example-topic spring.cloud.stream.bindings.process-in-0.group=springwolf spring.cloud.stream.bindings.process-out-0.destination=another-topic +spring.cloud.stream.bindings.biProcess-in-0.destination=bifunction-topic +spring.cloud.stream.bindings.biProcess-in-0.group=springwolf +spring.cloud.stream.bindings.biProcess-out-0.destination=bifunction-output-topic + spring.cloud.stream.bindings.consumerMethod-in-0.destination=consumer-topic spring.cloud.stream.bindings.consumerMethod-in-0.group=springwolf diff --git a/springwolf-examples/springwolf-cloud-stream-example/src/test/resources/asyncapi.json b/springwolf-examples/springwolf-cloud-stream-example/src/test/resources/asyncapi.json index cb9456861..b5206b668 100644 --- a/springwolf-examples/springwolf-cloud-stream-example/src/test/resources/asyncapi.json +++ b/springwolf-examples/springwolf-cloud-stream-example/src/test/resources/asyncapi.json @@ -44,6 +44,28 @@ "kafka": { } } }, + "bifunction-output-topic": { + "address": "bifunction-output-topic", + "messages": { + "AnotherPayloadDto": { + "$ref": "#/components/messages/AnotherPayloadDto" + } + }, + "bindings": { + "kafka": { } + } + }, + "bifunction-topic": { + "address": "bifunction-topic", + "messages": { + "ExamplePayloadDto": { + "$ref": "#/components/messages/ExamplePayloadDto" + } + }, + "bindings": { + "kafka": { } + } + }, "consumer-class-topic": { "address": "consumer-class-topic", "messages": { @@ -304,6 +326,36 @@ } ] }, + "bifunction-output-topic_subscribe_biProcess": { + "action": "send", + "channel": { + "$ref": "#/channels/bifunction-output-topic" + }, + "description": "Auto-generated description", + "bindings": { + "kafka": { } + }, + "messages": [ + { + "$ref": "#/channels/bifunction-output-topic/messages/AnotherPayloadDto" + } + ] + }, + "bifunction-topic_publish_biProcess": { + "action": "receive", + "channel": { + "$ref": "#/channels/bifunction-topic" + }, + "description": "Auto-generated description", + "bindings": { + "kafka": { } + }, + "messages": [ + { + "$ref": "#/channels/bifunction-topic/messages/ExamplePayloadDto" + } + ] + }, "consumer-class-topic_publish_ConsumerClass": { "action": "receive", "channel": { diff --git a/springwolf-plugins/springwolf-cloud-stream-plugin/src/main/java/io/github/springwolf/plugins/cloudstream/asyncapi/scanners/common/FunctionalChannelBeanBuilder.java b/springwolf-plugins/springwolf-cloud-stream-plugin/src/main/java/io/github/springwolf/plugins/cloudstream/asyncapi/scanners/common/FunctionalChannelBeanBuilder.java index 499c06307..cbd76a187 100644 --- a/springwolf-plugins/springwolf-cloud-stream-plugin/src/main/java/io/github/springwolf/plugins/cloudstream/asyncapi/scanners/common/FunctionalChannelBeanBuilder.java +++ b/springwolf-plugins/springwolf-cloud-stream-plugin/src/main/java/io/github/springwolf/plugins/cloudstream/asyncapi/scanners/common/FunctionalChannelBeanBuilder.java @@ -13,6 +13,7 @@ import java.util.List; import java.util.Set; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -42,6 +43,13 @@ public Set build(AnnotatedElement element) { return Set.of(ofConsumer(element, inputType), ofSupplier(element, outputType)); } + if (BiFunction.class.isAssignableFrom(type)) { + Type inputType = getTypeGenerics(element).get(0); + Type outputType = getTypeGenerics(element).get(2); + + return Set.of(ofConsumer(element, inputType), ofSupplier(element, outputType)); + } + return Collections.emptySet(); } diff --git a/springwolf-plugins/springwolf-cloud-stream-plugin/src/test/java/io/github/springwolf/plugins/cloudstream/asyncapi/scanners/channels/CloudStreamFunctionChannelsScannerIntegrationTest.java b/springwolf-plugins/springwolf-cloud-stream-plugin/src/test/java/io/github/springwolf/plugins/cloudstream/asyncapi/scanners/channels/CloudStreamFunctionChannelsScannerIntegrationTest.java index 597a5bce9..5dafa1407 100644 --- a/springwolf-plugins/springwolf-cloud-stream-plugin/src/test/java/io/github/springwolf/plugins/cloudstream/asyncapi/scanners/channels/CloudStreamFunctionChannelsScannerIntegrationTest.java +++ b/springwolf-plugins/springwolf-cloud-stream-plugin/src/test/java/io/github/springwolf/plugins/cloudstream/asyncapi/scanners/channels/CloudStreamFunctionChannelsScannerIntegrationTest.java @@ -55,6 +55,7 @@ import java.util.List; import java.util.Map; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -340,6 +341,90 @@ void testFunctionBinding() { Map.entry("test-out-topic_subscribe_testFunction", subscribeOperation)); } + @Test + void testBiFunctionBinding() { + // Given a binding "spring.cloud.stream.bindings.testFunction-in-0.destination=test-in-topic" + // And a binding "spring.cloud.stream.bindings.testFunction-out-0.destination=test-output-topic" + String inputTopicName = "test-in-topic"; + BindingProperties testBiFunctionInBinding = new BindingProperties(); + testBiFunctionInBinding.setDestination(inputTopicName); + + String outputTopicName = "test-out-topic"; + BindingProperties testBiFunctionOutBinding = new BindingProperties(); + testBiFunctionOutBinding.setDestination(outputTopicName); + when(bindingServiceProperties.getBindings()) + .thenReturn(Map.of( + "testBiFunction-in-0", testBiFunctionInBinding, + "testBiFunction-out-0", testBiFunctionOutBinding)); + + // When scan is called + Map actualChannels = channelsScanner.scan(); + Map actualOperations = operationsScanner.scan(); + + // Then the returned channels contain a publish ChannelItem and a subscribe ChannelItem + MessageObject subscribeMessage = MessageObject.builder() + .name(Integer.class.getName()) + .title(Integer.class.getSimpleName()) + .payload(MessagePayload.of(MultiFormatSchema.builder() + .schema(SchemaReference.fromSchema(Number.class.getSimpleName())) + .build())) + .headers(MessageHeaders.of( + MessageReference.toSchema(AsyncHeadersNotDocumented.NOT_DOCUMENTED.getTitle()))) + .bindings(Map.of("kafka", new EmptyMessageBinding())) + .build(); + + Operation subscribeOperation = Operation.builder() + .bindings(operationBinding) + .action(OperationAction.SEND) + .bindings(operationBinding) + .description("Auto-generated description") + .channel(ChannelReference.fromChannel(outputTopicName)) + .messages(List.of(MessageReference.toChannelMessage(outputTopicName, subscribeMessage))) + .build(); + + ChannelObject subscribeChannel = ChannelObject.builder() + .channelId(outputTopicName) + .address(outputTopicName) + .bindings(channelBinding) + .messages( + Map.of(subscribeMessage.getMessageId(), MessageReference.toComponentMessage(subscribeMessage))) + .build(); + + MessageObject publishMessage = MessageObject.builder() + .name(String.class.getName()) + .title(String.class.getSimpleName()) + .payload(MessagePayload.of(MultiFormatSchema.builder() + .schema(SchemaReference.fromSchema(String.class.getName())) + .build())) + .headers(MessageHeaders.of( + MessageReference.toSchema(AsyncHeadersNotDocumented.NOT_DOCUMENTED.getTitle()))) + .bindings(Map.of("kafka", new EmptyMessageBinding())) + .build(); + + Operation publishOperation = Operation.builder() + .bindings(operationBinding) + .action(OperationAction.RECEIVE) + .bindings(operationBinding) + .description("Auto-generated description") + .channel(ChannelReference.fromChannel(inputTopicName)) + .messages(List.of(MessageReference.toChannelMessage(inputTopicName, publishMessage))) + .build(); + + ChannelObject publishChannel = ChannelObject.builder() + .channelId(inputTopicName) + .address(inputTopicName) + .bindings(channelBinding) + .messages(Map.of(publishMessage.getMessageId(), MessageReference.toComponentMessage(publishMessage))) + .build(); + + assertThat(actualChannels) + .contains(Map.entry(inputTopicName, publishChannel), Map.entry(outputTopicName, subscribeChannel)); + assertThat(actualOperations) + .contains( + Map.entry("test-in-topic_publish_testBiFunction", publishOperation), + Map.entry("test-out-topic_subscribe_testBiFunction", subscribeOperation)); + } + @Test void testKStreamFunctionBinding() { // Given a binding "spring.cloud.stream.bindings.kStreamTestFunction-in-0.destination=test-in-topic" @@ -530,6 +615,11 @@ public Function testFunction() { return s -> 1; } + @Bean + public BiFunction, Integer> testBiFunction() { + return (value, headers) -> 1; + } + @Bean public Function, KStream> kStreamTestFunction() { return stream -> stream.mapValues(s -> 1); diff --git a/springwolf-plugins/springwolf-cloud-stream-plugin/src/test/java/io/github/springwolf/plugins/cloudstream/asyncapi/scanners/common/FunctionalChannelBeanBuilderTest.java b/springwolf-plugins/springwolf-cloud-stream-plugin/src/test/java/io/github/springwolf/plugins/cloudstream/asyncapi/scanners/common/FunctionalChannelBeanBuilderTest.java index f9de22b60..70a2629de 100644 --- a/springwolf-plugins/springwolf-cloud-stream-plugin/src/test/java/io/github/springwolf/plugins/cloudstream/asyncapi/scanners/common/FunctionalChannelBeanBuilderTest.java +++ b/springwolf-plugins/springwolf-cloud-stream-plugin/src/test/java/io/github/springwolf/plugins/cloudstream/asyncapi/scanners/common/FunctionalChannelBeanBuilderTest.java @@ -12,7 +12,10 @@ import java.lang.reflect.Method; import java.util.Arrays; +import java.util.Map; import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -64,6 +67,25 @@ private Consumer consumerBean() { } } + @Nested + class BiConsumerBean { + @Test + void testBiConsumerBean() throws NoSuchMethodException { + Method method = getMethod(this.getClass(), "biConsumerBean"); + + Set data = functionalChannelBeanBuilder.build(method); + + Assertions.assertThat(data) + .containsExactly(new FunctionalChannelBeanData( + "biConsumerBean", method, String.class, CONSUMER, "biConsumerBean-in-0")); + } + + @Bean + private BiConsumer> biConsumerBean() { + return (input, headers) -> System.out.println(input); + } + } + @Nested class SupplierBean { @Test @@ -105,6 +127,28 @@ private Function functionBean() { } } + @Nested + class BiFunctionBean { + @Test + void testBiFunctionBean() throws NoSuchMethodException { + Method method = getMethod(this.getClass(), "biFunctionBean"); + + Set data = functionalChannelBeanBuilder.build(method); + + Assertions.assertThat(data) + .containsExactlyInAnyOrder( + new FunctionalChannelBeanData( + "biFunctionBean", method, String.class, CONSUMER, "biFunctionBean-in-0"), + new FunctionalChannelBeanData( + "biFunctionBean", method, Integer.class, SUPPLIER, "biFunctionBean-out-0")); + } + + @Bean + private BiFunction, Integer> biFunctionBean() { + return (s, headers) -> 1; + } + } + @Nested class ConsumerBeanWithGenericPayload {