diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java index 4f29fe404..aaac7b92b 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java @@ -244,6 +244,18 @@ private static void checkBrokerVersion(org.apache.qpid.protonj2.client.Connectio } } + private static String brokerVersion(org.apache.qpid.protonj2.client.Connection connection) { + try { + return (String) connection.properties().get("version"); + } catch (ClientException e) { + throw ExceptionUtils.convert(e); + } + } + + String brokerVersion() { + return brokerVersion(this.nativeConnection); + } + private static String extractNode(org.apache.qpid.protonj2.client.Connection connection) throws ClientException { String node = (String) connection.properties().get("node"); diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java index 12060a4c1..8c3744746 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/Utils.java @@ -202,7 +202,7 @@ private static String currentVersion(String currentVersion) { /** * https://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java */ - private static int versionCompare(String str1, String str2) { + static int versionCompare(String str1, String str2) { String[] vals1 = str1.split("\\."); String[] vals2 = str2.split("\\."); int i = 0; diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java index 6a7b777b6..1a57491fd 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java @@ -21,6 +21,7 @@ import static com.rabbitmq.client.amqp.Management.ExchangeType.FANOUT; import static com.rabbitmq.client.amqp.Management.QueueType.*; import static com.rabbitmq.client.amqp.Management.QueueType.STREAM; +import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_0_3; import static com.rabbitmq.client.amqp.impl.TestUtils.*; import static java.nio.charset.StandardCharsets.*; import static java.util.Collections.emptyMap; @@ -30,6 +31,7 @@ import static org.assertj.core.api.Assertions.*; import com.rabbitmq.client.amqp.*; +import com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersionAtLeast; import com.rabbitmq.client.amqp.impl.TestUtils.DisabledIfAddressV1Permitted; import com.rabbitmq.client.amqp.impl.TestUtils.Sync; import java.util.*; @@ -378,34 +380,41 @@ void publisherSendingShouldThrowWhenQueueHasBeenDeleted() { } @Test + @BrokerVersionAtLeast(RABBITMQ_4_0_3) void publisherSendingShouldThrowWhenPublishingToNonExistingExchangeWithToProperty() { String doesNotExist = uuid(); - Sync closedSync = sync(); - AtomicReference closedException = new AtomicReference<>(); - Publisher publisher = - connection - .publisherBuilder() - .listeners(closedListener(closedSync, ctx -> closedException.set(ctx.failureCause()))) - .build(); - AtomicReference exception = new AtomicReference<>(); - waitAtMost( - () -> { - try { - publisher.publish( - publisher.message().toAddress().exchange(doesNotExist).message(), ctx -> {}); - return false; - } catch (AmqpException.AmqpEntityDoesNotExistException e) { - exception.set(e); - return true; - } - }); - Assertions.assertThat(closedSync).completes(); - of(exception.get(), closedException.get()) - .forEach( - e -> - assertThat(e) - .isInstanceOf(AmqpException.AmqpEntityDoesNotExistException.class) - .hasMessageContaining(doesNotExist)); + connection.management().queue(name).exclusive(true).declare(); + Publisher publisher = connection.publisherBuilder().build(); + Sync consumedSync = sync(); + connection + .consumerBuilder() + .queue(name) + .messageHandler( + (ctx, msg) -> { + ctx.accept(); + consumedSync.down(); + }) + .build(); + Sync acceptedSync = sync(); + publisher.publish( + publisher.message().toAddress().queue(name).message(), ctx -> acceptedSync.down()); + Assertions.assertThat(acceptedSync).completes(); + Assertions.assertThat(consumedSync).completes(); + + acceptedSync.reset(); + consumedSync.reset(); + + Sync rejectedSync = sync(); + publisher.publish( + publisher.message().toAddress().exchange(doesNotExist).message(), + ctx -> rejectedSync.down()); + Assertions.assertThat(rejectedSync).completes(); + + Assertions.assertThat(consumedSync).hasNotCompleted(); + publisher.publish( + publisher.message().toAddress().queue(name).message(), ctx -> acceptedSync.down()); + Assertions.assertThat(acceptedSync).completes(); + Assertions.assertThat(consumedSync).completes(); } @Test diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java b/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java new file mode 100644 index 000000000..a56924cf1 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java @@ -0,0 +1,138 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client.amqp.impl; + +import com.rabbitmq.client.amqp.Environment; +import java.lang.annotation.*; +import java.util.function.Function; +import org.junit.jupiter.api.extension.ConditionEvaluationResult; +import org.junit.jupiter.api.extension.ExecutionCondition; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class TestConditions { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestConditions.class); + + private TestConditions() {} + + public enum BrokerVersion { + RABBITMQ_4_0_3("4.0.3"); + + final String value; + + BrokerVersion(String value) { + this.value = value; + } + + @Override + public String toString() { + return this.value; + } + } + + @Target({ElementType.TYPE, ElementType.METHOD}) + @Retention(RetentionPolicy.RUNTIME) + @Documented + @ExtendWith(BrokerVersionAtLeastCondition.class) + public @interface BrokerVersionAtLeast { + + BrokerVersion value(); + } + + private static class BrokerVersionAtLeastCondition implements ExecutionCondition { + + private final Function versionProvider; + + private BrokerVersionAtLeastCondition() { + this.versionProvider = + context -> { + BrokerVersionAtLeast annotation = + context.getElement().get().getAnnotation(BrokerVersionAtLeast.class); + return annotation == null ? null : annotation.value().toString(); + }; + } + + @Override + public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { + if (context.getTestMethod().isEmpty()) { + return ConditionEvaluationResult.enabled("Apply only to methods"); + } + String expectedVersion = versionProvider.apply(context); + if (expectedVersion == null) { + return ConditionEvaluationResult.enabled("No broker version requirement"); + } else { + String brokerVersion = + context + .getRoot() + .getStore(ExtensionContext.Namespace.GLOBAL) + .getOrComputeIfAbsent( + "brokerVersion", + k -> { + try (Environment env = TestUtils.environmentBuilder().build()) { + return ((AmqpConnection) env.connectionBuilder().build()).brokerVersion(); + } + }, + String.class); + + if (atLeastVersion(expectedVersion, brokerVersion)) { + return ConditionEvaluationResult.enabled( + "Broker version requirement met, expected " + + expectedVersion + + ", actual " + + brokerVersion); + } else { + return ConditionEvaluationResult.disabled( + "Broker version requirement not met, expected " + + expectedVersion + + ", actual " + + brokerVersion); + } + } + } + } + + private static boolean atLeastVersion(String expectedVersion, String currentVersion) { + try { + currentVersion = currentVersion(currentVersion); + return "0.0.0".equals(currentVersion) + || Utils.versionCompare(currentVersion, expectedVersion) >= 0; + } catch (RuntimeException e) { + LOGGER.warn("Unable to parse broker version {}", currentVersion, e); + throw e; + } + } + + private static String currentVersion(String currentVersion) { + // versions built from source: 3.7.0+rc.1.4.gedc5d96 + if (currentVersion.contains("+")) { + currentVersion = currentVersion.substring(0, currentVersion.indexOf("+")); + } + // alpha (snapshot) versions: 3.7.0~alpha.449-1 + if (currentVersion.contains("~")) { + currentVersion = currentVersion.substring(0, currentVersion.indexOf("~")); + } + // alpha (snapshot) versions: 3.7.1-alpha.40 + if (currentVersion.contains("-")) { + currentVersion = currentVersion.substring(0, currentVersion.indexOf("-")); + } + return currentVersion; + } +}