Skip to content

Commit

Permalink
Message with non-existing exchange in to field is rejected
Browse files Browse the repository at this point in the history
Instead of closing the link.

References rabbitmq/rabbitmq-server#12391
  • Loading branch information
acogoluegnes committed Sep 27, 2024
1 parent c9501a8 commit 24c20b6
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 27 deletions.
12 changes: 12 additions & 0 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,18 @@ private static void checkBrokerVersion(org.apache.qpid.protonj2.client.Connectio
}
}

private static String brokerVersion(org.apache.qpid.protonj2.client.Connection connection) {
try {
return (String) connection.properties().get("version");
} catch (ClientException e) {
throw ExceptionUtils.convert(e);
}
}

String brokerVersion() {
return brokerVersion(this.nativeConnection);
}

private static String extractNode(org.apache.qpid.protonj2.client.Connection connection)
throws ClientException {
String node = (String) connection.properties().get("node");
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/rabbitmq/client/amqp/impl/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private static String currentVersion(String currentVersion) {
/**
* https://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java
*/
private static int versionCompare(String str1, String str2) {
static int versionCompare(String str1, String str2) {
String[] vals1 = str1.split("\\.");
String[] vals2 = str2.split("\\.");
int i = 0;
Expand Down
61 changes: 35 additions & 26 deletions src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.rabbitmq.client.amqp.Management.ExchangeType.FANOUT;
import static com.rabbitmq.client.amqp.Management.QueueType.*;
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_0_3;
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
import static java.nio.charset.StandardCharsets.*;
import static java.util.Collections.emptyMap;
Expand All @@ -30,6 +31,7 @@
import static org.assertj.core.api.Assertions.*;

import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersionAtLeast;
import com.rabbitmq.client.amqp.impl.TestUtils.DisabledIfAddressV1Permitted;
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
import java.util.*;
Expand Down Expand Up @@ -378,34 +380,41 @@ void publisherSendingShouldThrowWhenQueueHasBeenDeleted() {
}

@Test
@BrokerVersionAtLeast(RABBITMQ_4_0_3)
void publisherSendingShouldThrowWhenPublishingToNonExistingExchangeWithToProperty() {
String doesNotExist = uuid();
Sync closedSync = sync();
AtomicReference<Throwable> closedException = new AtomicReference<>();
Publisher publisher =
connection
.publisherBuilder()
.listeners(closedListener(closedSync, ctx -> closedException.set(ctx.failureCause())))
.build();
AtomicReference<Exception> exception = new AtomicReference<>();
waitAtMost(
() -> {
try {
publisher.publish(
publisher.message().toAddress().exchange(doesNotExist).message(), ctx -> {});
return false;
} catch (AmqpException.AmqpEntityDoesNotExistException e) {
exception.set(e);
return true;
}
});
Assertions.assertThat(closedSync).completes();
of(exception.get(), closedException.get())
.forEach(
e ->
assertThat(e)
.isInstanceOf(AmqpException.AmqpEntityDoesNotExistException.class)
.hasMessageContaining(doesNotExist));
connection.management().queue(name).exclusive(true).declare();
Publisher publisher = connection.publisherBuilder().build();
Sync consumedSync = sync();
connection
.consumerBuilder()
.queue(name)
.messageHandler(
(ctx, msg) -> {
ctx.accept();
consumedSync.down();
})
.build();
Sync acceptedSync = sync();
publisher.publish(
publisher.message().toAddress().queue(name).message(), ctx -> acceptedSync.down());
Assertions.assertThat(acceptedSync).completes();
Assertions.assertThat(consumedSync).completes();

acceptedSync.reset();
consumedSync.reset();

Sync rejectedSync = sync();
publisher.publish(
publisher.message().toAddress().exchange(doesNotExist).message(),
ctx -> rejectedSync.down());
Assertions.assertThat(rejectedSync).completes();

Assertions.assertThat(consumedSync).hasNotCompleted();
publisher.publish(
publisher.message().toAddress().queue(name).message(), ctx -> acceptedSync.down());
Assertions.assertThat(acceptedSync).completes();
Assertions.assertThat(consumedSync).completes();
}

@Test
Expand Down
138 changes: 138 additions & 0 deletions src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) 2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].
package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.Environment;
import java.lang.annotation.*;
import java.util.function.Function;
import org.junit.jupiter.api.extension.ConditionEvaluationResult;
import org.junit.jupiter.api.extension.ExecutionCondition;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TestConditions {

private static final Logger LOGGER = LoggerFactory.getLogger(TestConditions.class);

private TestConditions() {}

public enum BrokerVersion {
RABBITMQ_4_0_3("4.0.3");

final String value;

BrokerVersion(String value) {
this.value = value;
}

@Override
public String toString() {
return this.value;
}
}

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@ExtendWith(BrokerVersionAtLeastCondition.class)
public @interface BrokerVersionAtLeast {

BrokerVersion value();
}

private static class BrokerVersionAtLeastCondition implements ExecutionCondition {

private final Function<ExtensionContext, String> versionProvider;

private BrokerVersionAtLeastCondition() {
this.versionProvider =
context -> {
BrokerVersionAtLeast annotation =
context.getElement().get().getAnnotation(BrokerVersionAtLeast.class);
return annotation == null ? null : annotation.value().toString();
};
}

@Override
public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
if (context.getTestMethod().isEmpty()) {
return ConditionEvaluationResult.enabled("Apply only to methods");
}
String expectedVersion = versionProvider.apply(context);
if (expectedVersion == null) {
return ConditionEvaluationResult.enabled("No broker version requirement");
} else {
String brokerVersion =
context
.getRoot()
.getStore(ExtensionContext.Namespace.GLOBAL)
.getOrComputeIfAbsent(
"brokerVersion",
k -> {
try (Environment env = TestUtils.environmentBuilder().build()) {
return ((AmqpConnection) env.connectionBuilder().build()).brokerVersion();
}
},
String.class);

if (atLeastVersion(expectedVersion, brokerVersion)) {
return ConditionEvaluationResult.enabled(
"Broker version requirement met, expected "
+ expectedVersion
+ ", actual "
+ brokerVersion);
} else {
return ConditionEvaluationResult.disabled(
"Broker version requirement not met, expected "
+ expectedVersion
+ ", actual "
+ brokerVersion);
}
}
}
}

private static boolean atLeastVersion(String expectedVersion, String currentVersion) {
try {
currentVersion = currentVersion(currentVersion);
return "0.0.0".equals(currentVersion)
|| Utils.versionCompare(currentVersion, expectedVersion) >= 0;
} catch (RuntimeException e) {
LOGGER.warn("Unable to parse broker version {}", currentVersion, e);
throw e;
}
}

private static String currentVersion(String currentVersion) {
// versions built from source: 3.7.0+rc.1.4.gedc5d96
if (currentVersion.contains("+")) {
currentVersion = currentVersion.substring(0, currentVersion.indexOf("+"));
}
// alpha (snapshot) versions: 3.7.0~alpha.449-1
if (currentVersion.contains("~")) {
currentVersion = currentVersion.substring(0, currentVersion.indexOf("~"));
}
// alpha (snapshot) versions: 3.7.1-alpha.40
if (currentVersion.contains("-")) {
currentVersion = currentVersion.substring(0, currentVersion.indexOf("-"));
}
return currentVersion;
}
}

0 comments on commit 24c20b6

Please sign in to comment.