diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtil.java b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtil.java
index 541f3ff78..0edce4270 100644
--- a/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtil.java
+++ b/springwolf-plugins/springwolf-amqp-plugin/src/main/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtil.java
@@ -46,7 +46,6 @@
*
Consumer consumes the message from the queue
*
*/
-// TODO: should this do validation and throw errors when an invalid rabbit configuration is found?
@Slf4j
public class RabbitListenerUtil {
public static final String BINDING_NAME = "amqp";
@@ -60,47 +59,29 @@ public static String getChannelName(RabbitListener annotation, StringValueResolv
Stream annotationBindingChannelNames = Arrays.stream(annotation.bindings())
.flatMap(binding -> channelNameFromAnnotationBindings(binding, resolver));
- return Stream.concat(streamQueueNames(annotation), annotationBindingChannelNames)
- .map(resolver::resolveStringValue)
- .filter(Objects::nonNull)
- .peek(queue -> log.debug("Resolved channel name: {}", queue))
- .findFirst()
- .orElseThrow(
- () -> new IllegalArgumentException(
- "No channel name was found in @RabbitListener annotation (neither in queues nor bindings property)"));
+ Stream stream = Stream.concat(streamQueueNames(annotation), annotationBindingChannelNames);
+ return resolveFirstValue(stream, resolver, "channel name");
}
public static String getChannelId(RabbitListener annotation, StringValueResolver resolver) {
Stream annotationBindingChannelIds = Arrays.stream(annotation.bindings())
.flatMap(binding -> channelIdFromAnnotationBindings(binding, resolver));
- return Stream.concat(streamQueueNames(annotation).map(ReferenceUtil::toValidId), annotationBindingChannelIds)
- .map(resolver::resolveStringValue)
- .filter(Objects::nonNull)
- .peek(queue -> log.debug("Resolved channel id: {}", queue))
- .findFirst()
- .orElseThrow(
- () -> new IllegalArgumentException(
- "No channel id was found in @RabbitListener annotation (neither in queues nor bindings property)"));
+ Stream stream =
+ Stream.concat(streamQueueNames(annotation).map(ReferenceUtil::toValidId), annotationBindingChannelIds);
+ return resolveFirstValue(stream, resolver, "channel id");
}
private static String getQueueName(RabbitListener annotation, StringValueResolver resolver) {
- Stream annotationBindingChannelNames = Arrays.stream(annotation.bindings())
+ Stream annotationBindingQueueNames = Arrays.stream(annotation.bindings())
.flatMap(binding -> Stream.of(binding.value().name()));
- return Stream.concat(streamQueueNames(annotation), annotationBindingChannelNames)
- .map(resolver::resolveStringValue)
- .filter(Objects::nonNull)
- .peek(queue -> log.debug("Resolved queue name: {}", queue))
- .findFirst()
- .orElseThrow(
- () -> new IllegalArgumentException(
- "No queue name was found in @RabbitListener annotation (neither in queues nor bindings property)"));
+ Stream stream = Stream.concat(streamQueueNames(annotation), annotationBindingQueueNames);
+ return resolveFirstValue(stream, resolver, "queue name");
}
private static Stream channelNameFromAnnotationBindings(
QueueBinding binding, StringValueResolver resolver) {
- String queueName = resolver.resolveStringValue(binding.value().name());
String exchangeName = resolver.resolveStringValue(binding.exchange().name());
String[] routingKeys = binding.key();
@@ -111,15 +92,6 @@ private static Stream channelNameFromAnnotationBindings(
return Arrays.stream(routingKeys).map(resolver::resolveStringValue).map(routingKey -> exchangeName);
}
- private static String exchangeTargetChannelIdFromAnnotationBindings(
- RabbitListener annotation, StringValueResolver resolver) {
- return Arrays.stream(annotation.bindings())
- .map(binding -> binding.value().name() + "-id")
- .map(resolver::resolveStringValue)
- .findFirst()
- .orElse(null);
- }
-
private static Stream channelIdFromAnnotationBindings(QueueBinding binding, StringValueResolver resolver) {
String queueName = resolver.resolveStringValue(binding.value().name());
String exchangeName = resolver.resolveStringValue(binding.exchange().name());
@@ -170,6 +142,14 @@ public static Map buildChannelBinding(
return Map.of(BINDING_NAME, channelBinding.build());
}
+ private static String exchangeTargetChannelIdFromAnnotationBindings(
+ RabbitListener annotation, StringValueResolver resolver) {
+ Stream stream = Arrays.stream(annotation.bindings())
+ .map(binding -> binding.value().name() + "-id");
+
+ return resolveFirstValue(stream, resolver, "exchange target channel id");
+ }
+
private static AMQPChannelExchangeProperties buildExchangeProperties(
RabbitListener annotation, String exchangeName, RabbitListenerUtilContext context) {
@@ -314,4 +294,13 @@ public static Map buildMessageBinding() {
// currently the feature to define amqp message binding is not implemented.
return Map.of(BINDING_NAME, new AMQPMessageBinding());
}
+
+ private static String resolveFirstValue(Stream values, StringValueResolver resolver, String valueType) {
+ return values.map(resolver::resolveStringValue)
+ .filter(Objects::nonNull)
+ .peek(value -> log.debug("Resolved {}: {}", valueType, value))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("No " + valueType
+ + " was found in @RabbitListener annotation (neither in queues nor bindings property)"));
+ }
}
diff --git a/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtilTest.java b/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtilTest.java
index a3b7323c2..54afa5f74 100644
--- a/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtilTest.java
+++ b/springwolf-plugins/springwolf-amqp-plugin/src/test/java/io/github/springwolf/plugins/amqp/asyncapi/scanners/bindings/RabbitListenerUtilTest.java
@@ -14,7 +14,6 @@
import io.github.springwolf.asyncapi.v3.model.channel.ChannelReference;
import org.assertj.core.util.Sets;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.TopicExchange;
@@ -106,8 +105,10 @@ void buildChannelBinding() {
channelBinding.get("amqp"));
}
+ /**
+ * Technically an invalid configuration as queue will be part of the spring context
+ */
@Test
- @Disabled("TODO: what to do with invalid configuration")
void buildChannelBindingWithEmptyContext() {
// given
RabbitListener annotation = getAnnotation(ClassWithQueuesConfiguration.class);
@@ -148,8 +149,10 @@ void buildOperationBinding() {
assertEquals(AMQPOperationBinding.builder().build(), operationBinding.get("amqp"));
}
+ /**
+ * Technically an invalid configuration as queue will be part of the spring context
+ */
@Test
- @Disabled("TODO: what to do with invalid configuration")
void buildOperationBindingWithEmptyContext() {
// given
RabbitListener annotation = getAnnotation(ClassWithQueuesConfiguration.class);
@@ -161,7 +164,7 @@ void buildOperationBindingWithEmptyContext() {
// then
assertEquals(1, operationBinding.size());
assertEquals(Sets.newTreeSet("amqp"), operationBinding.keySet());
- assertEquals(AMQPOperationBinding.builder().cc(List.of("queue-1")).build(), operationBinding.get("amqp"));
+ assertEquals(AMQPOperationBinding.builder().build(), operationBinding.get("amqp"));
}
@Test
@@ -197,8 +200,10 @@ void getChannelName() {
assertEquals("queue-1", channelName);
}
+ /**
+ * Technically an invalid configuration as context should be empty
+ */
@Test
- @Disabled("TODO: what to do with invalid configuration")
void buildChannelBinding() {
// given
RabbitListener annotation = getAnnotation(ClassWithQueuesToDeclare.class);
@@ -215,9 +220,9 @@ void buildChannelBinding() {
.is(AMQPChannelType.QUEUE)
.queue(AMQPChannelQueueProperties.builder()
.name("queue-1")
- .durable(true)
- .autoDelete(false)
- .exclusive(false)
+ .durable(false)
+ .autoDelete(true)
+ .exclusive(true)
.vhost("/")
.build())
.build(),
@@ -250,8 +255,10 @@ void buildChannelBindingWithEmptyContext() {
channelBinding.get("amqp"));
}
+ /**
+ * Technically an invalid configuration as context should be empty
+ */
@Test
- @Disabled("TODO: what to do with invalid configuration")
void buildOperationBinding() {
// given
RabbitListener annotation = getAnnotation(ClassWithQueuesToDeclare.class);
@@ -263,7 +270,7 @@ void buildOperationBinding() {
// then
assertEquals(1, operationBinding.size());
assertEquals(Sets.newTreeSet("amqp"), operationBinding.keySet());
- assertEquals(AMQPOperationBinding.builder().cc(List.of("queue-1")).build(), operationBinding.get("amqp"));
+ assertEquals(AMQPOperationBinding.builder().build(), operationBinding.get("amqp"));
}
@Test
@@ -380,8 +387,10 @@ void buildChannelBindingWithExchangeContext() {
channelBinding.get("amqp"));
}
+ /**
+ * Technically an invalid configuration as queue and exchange will be part of the spring context
+ */
@Test
- @Disabled("TODO: what to do with invalid configuration")
void buildChannelBindingWithEmptyContext() {
// given
RabbitListener annotation = getAnnotation(ClassWithBindingConfiguration.class);
@@ -396,9 +405,11 @@ void buildChannelBindingWithEmptyContext() {
assertEquals(
AMQPChannelBinding.builder()
.is(AMQPChannelType.ROUTING_KEY)
+ .name("#")
+ .channel(ChannelReference.fromChannel("queue-1-id"))
.exchange(AMQPChannelExchangeProperties.builder()
.name("exchange-name")
- .type(AMQPChannelExchangeType.TOPIC)
+ .type(AMQPChannelExchangeType.DIRECT)
.durable(true)
.autoDelete(false)
.build())
@@ -436,8 +447,10 @@ void buildOperationBindingWithExchangeContext() {
assertEquals(AMQPOperationBinding.builder().build(), operationBinding.get("amqp"));
}
+ /**
+ * Technically an invalid configuration as queue and exchange will be part of the spring context
+ */
@Test
- @Disabled("TODO: what to do with invalid configuration")
void buildOperationBindingWithEmptyContext() {
// given
RabbitListener annotation = getAnnotation(ClassWithBindingConfiguration.class);
@@ -449,7 +462,7 @@ void buildOperationBindingWithEmptyContext() {
// then
assertEquals(1, operationBinding.size());
assertEquals(Sets.newTreeSet("amqp"), operationBinding.keySet());
- assertEquals(AMQPOperationBinding.builder().cc(List.of("")).build(), operationBinding.get("amqp"));
+ assertEquals(AMQPOperationBinding.builder().build(), operationBinding.get("amqp"));
}
@Test
@@ -528,7 +541,7 @@ void buildChannelBinding() {
}
@Test
- @Disabled("TODO: what to do with invalid configuration")
+ // @Disabled("TODO: what to do with invalid configuration")
void buildChannelBindingWithEmptyContext() {
// given
RabbitListener annotation = getAnnotation(ClassWithBindingsAndRoutingKeyConfiguration.class);
@@ -543,6 +556,8 @@ void buildChannelBindingWithEmptyContext() {
assertEquals(
AMQPChannelBinding.builder()
.is(AMQPChannelType.ROUTING_KEY)
+ .name("routing-key")
+ .channel(ChannelReference.fromChannel("queue-1-id"))
.exchange(AMQPChannelExchangeProperties.builder()
.name("exchange-name")
.type(AMQPChannelExchangeType.DIRECT)
@@ -569,7 +584,7 @@ void buildOperationBinding() {
}
@Test
- @Disabled("TODO: what to do with invalid configuration")
+ // @Disabled("TODO: what to do with invalid configuration")
void buildOperationBindingWithEmptyContext() {
// given
RabbitListener annotation = getAnnotation(ClassWithBindingsAndRoutingKeyConfiguration.class);
@@ -581,8 +596,7 @@ void buildOperationBindingWithEmptyContext() {
// then
assertEquals(1, operationBinding.size());
assertEquals(Sets.newTreeSet("amqp"), operationBinding.keySet());
- assertEquals(
- AMQPOperationBinding.builder().cc(List.of("routing-key")).build(), operationBinding.get("amqp"));
+ assertEquals(AMQPOperationBinding.builder().build(), operationBinding.get("amqp"));
}
@Test