From b4c409f187c6245ec432e66eae12ea4b7ef2404b Mon Sep 17 00:00:00 2001 From: SzymonPobiega Date: Fri, 10 Jan 2025 14:57:19 +0100 Subject: [PATCH 1/6] Use isolated dispatch for control message to prevent attaching to outbox's transaction scope if transport supports transaction scopes --- .../OutboxTransactionalSession.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs b/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs index 4a7786c..880e37c 100644 --- a/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs +++ b/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs @@ -71,7 +71,7 @@ async Task DispatchControlMessage(CancellationToken cancellationToken) } } var message = new OutgoingMessage(SessionId, headers, ReadOnlyMemory.Empty); - var outgoingMessages = new TransportOperations(new TransportTransportOperation(message, new UnicastAddressTag(physicalQueueAddress))); + var outgoingMessages = new TransportOperations(new TransportTransportOperation(message, new UnicastAddressTag(physicalQueueAddress), null, DispatchConsistency.Isolated)); await dispatcher.Dispatch(outgoingMessages, new TransportTransaction(), cancellationToken).ConfigureAwait(false); } From 877f893c1ee19dfeb14e6d234ada41d3274ae449 Mon Sep 17 00:00:00 2001 From: SzymonPobiega Date: Mon, 13 Jan 2025 10:45:10 +0100 Subject: [PATCH 2/6] Add an assert that verifies dispatch consistency --- .../TransactionalSessionTests.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/NServiceBus.TransactionalSession.Tests/TransactionalSessionTests.cs b/src/NServiceBus.TransactionalSession.Tests/TransactionalSessionTests.cs index 42fc9a5..a303735 100644 --- a/src/NServiceBus.TransactionalSession.Tests/TransactionalSessionTests.cs +++ b/src/NServiceBus.TransactionalSession.Tests/TransactionalSessionTests.cs @@ -6,6 +6,7 @@ using Extensibility; using Fakes; using NUnit.Framework; + using Transport; [TestFixture] public class TransactionalSessionTests @@ -124,6 +125,7 @@ public async Task Commit_should_send_message_and_commit_storage_tx() var dispatchedMessage = dispatched.outgoingMessages.UnicastTransportOperations.Single(); Assert.Multiple(() => { + Assert.That(dispatchedMessage.RequiredDispatchConsistency, Is.EqualTo(DispatchConsistency.Isolated)); Assert.That(dispatchedMessage.Message.MessageId, Is.EqualTo(messageId)); Assert.That(dispatchedMessage.Message.Headers.ContainsKey(Headers.ControlMessageHeader), Is.False); From ded67096562e33e0979f589a68cb1b8bf0ac7d35 Mon Sep 17 00:00:00 2001 From: SzymonPobiega Date: Mon, 13 Jan 2025 10:49:04 +0100 Subject: [PATCH 3/6] Move the assert to the correct place --- .../OutboxTransactionalSessionTests.cs | 2 ++ .../TransactionalSessionTests.cs | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/NServiceBus.TransactionalSession.Tests/OutboxTransactionalSessionTests.cs b/src/NServiceBus.TransactionalSession.Tests/OutboxTransactionalSessionTests.cs index 099abd4..7b321a1 100644 --- a/src/NServiceBus.TransactionalSession.Tests/OutboxTransactionalSessionTests.cs +++ b/src/NServiceBus.TransactionalSession.Tests/OutboxTransactionalSessionTests.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Extensibility; using Fakes; + using NServiceBus.Transport; using NUnit.Framework; [TestFixture] @@ -135,6 +136,7 @@ public async Task Commit_should_send_control_message_and_store_outbox_data() var controlMessage = dispatched.outgoingMessages.UnicastTransportOperations.Single(); Assert.Multiple(() => { + Assert.That(controlMessage.RequiredDispatchConsistency, Is.EqualTo(DispatchConsistency.Isolated)); Assert.That(controlMessage.Message.MessageId, Is.EqualTo(session.SessionId)); Assert.That(controlMessage.Message.Headers[Headers.ControlMessageHeader], Is.EqualTo(bool.TrueString)); Assert.That(controlMessage.Message.Body.IsEmpty, Is.True); diff --git a/src/NServiceBus.TransactionalSession.Tests/TransactionalSessionTests.cs b/src/NServiceBus.TransactionalSession.Tests/TransactionalSessionTests.cs index a303735..50b3b00 100644 --- a/src/NServiceBus.TransactionalSession.Tests/TransactionalSessionTests.cs +++ b/src/NServiceBus.TransactionalSession.Tests/TransactionalSessionTests.cs @@ -125,7 +125,6 @@ public async Task Commit_should_send_message_and_commit_storage_tx() var dispatchedMessage = dispatched.outgoingMessages.UnicastTransportOperations.Single(); Assert.Multiple(() => { - Assert.That(dispatchedMessage.RequiredDispatchConsistency, Is.EqualTo(DispatchConsistency.Isolated)); Assert.That(dispatchedMessage.Message.MessageId, Is.EqualTo(messageId)); Assert.That(dispatchedMessage.Message.Headers.ContainsKey(Headers.ControlMessageHeader), Is.False); From cbeec05f44c1656c9811ffaf088099c431a046fe Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Tue, 28 Jan 2025 14:24:16 +0100 Subject: [PATCH 4/6] Added comment on why Isolated is needed --- .../OutboxTransactionalSession.cs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs b/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs index 880e37c..ef6a491 100644 --- a/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs +++ b/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs @@ -71,8 +71,14 @@ async Task DispatchControlMessage(CancellationToken cancellationToken) } } var message = new OutgoingMessage(SessionId, headers, ReadOnlyMemory.Empty); - var outgoingMessages = new TransportOperations(new TransportTransportOperation(message, new UnicastAddressTag(physicalQueueAddress), null, DispatchConsistency.Isolated)); - await dispatcher.Dispatch(outgoingMessages, new TransportTransaction(), cancellationToken).ConfigureAwait(false); + var operation = new TransportTransportOperation( + message, + new UnicastAddressTag(physicalQueueAddress), + null, + DispatchConsistency.Isolated // We do not want the this dispatch to enlist in any active transaction scope like we do want for the outbox operation + ); + var outgoingMessages = new TransportOperations(); + await dispatcher.Dispatch(outgoingMessages, new TransportTransaction(operation), cancellationToken).ConfigureAwait(false); } protected override void Dispose(bool disposing) From 354a6d19ae2dd16fb9446e1c0950785039f1d7a0 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Tue, 28 Jan 2025 14:29:00 +0100 Subject: [PATCH 5/6] Now using correct constructor --- .../OutboxTransactionalSession.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs b/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs index ef6a491..b0063f0 100644 --- a/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs +++ b/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs @@ -77,8 +77,8 @@ async Task DispatchControlMessage(CancellationToken cancellationToken) null, DispatchConsistency.Isolated // We do not want the this dispatch to enlist in any active transaction scope like we do want for the outbox operation ); - var outgoingMessages = new TransportOperations(); - await dispatcher.Dispatch(outgoingMessages, new TransportTransaction(operation), cancellationToken).ConfigureAwait(false); + var outgoingMessages = new TransportOperations(operation); + await dispatcher.Dispatch(outgoingMessages, new TransportTransaction(), cancellationToken).ConfigureAwait(false); } protected override void Dispose(bool disposing) From ad077aa0de8fb3331f5113d60ba7f8100669a220 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Tue, 28 Jan 2025 14:38:49 +0100 Subject: [PATCH 6/6] Further improve comment --- .../OutboxTransactionalSession.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs b/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs index b0063f0..6867f11 100644 --- a/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs +++ b/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs @@ -75,7 +75,7 @@ async Task DispatchControlMessage(CancellationToken cancellationToken) message, new UnicastAddressTag(physicalQueueAddress), null, - DispatchConsistency.Isolated // We do not want the this dispatch to enlist in any active transaction scope like we do want for the outbox operation + DispatchConsistency.Isolated // Avoids promoting to distributed tx by not combining transport and persistence when both share same technology ); var outgoingMessages = new TransportOperations(operation); await dispatcher.Dispatch(outgoingMessages, new TransportTransaction(), cancellationToken).ConfigureAwait(false);