Skip to content

Commit

Permalink
Transaction.Current may not be set after opening the session (#184) (#…
Browse files Browse the repository at this point in the history
…186)

* A quick spike to verify we can float the tx scope




* Add a test

---------

Co-authored-by: Szymon Pobiega <[email protected]>
Co-authored-by: Tomasz Masternak <[email protected]>
  • Loading branch information
3 people authored Apr 6, 2023
1 parent b410b11 commit cb348a2
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Microsoft.Extensions.DependencyInjection;
using AcceptanceTesting;
using NUnit.Framework;
Expand Down Expand Up @@ -74,10 +75,39 @@ public async Task Should_send_immediate_dispatch_messages_even_if_session_is_not
Assert.True(result.MessageReceived);
}

[Test]
public async Task Should_make_it_possible_float_ambient_transactions()
{
var result = await Scenario.Define<Context>()
.WithEndpoint<AnEndpoint>(s => s.When(async (_, ctx) =>
{
using var scope = ctx.ServiceProvider.CreateScope();
using var transactionalSession = scope.ServiceProvider.GetRequiredService<ITransactionalSession>();

await transactionalSession.Open(new CustomTestingPersistenceOpenSessionOptions
{
UseTransactionScope = true
});

ctx.AmbientTransactionFoundBeforeAwait = Transaction.Current != null;

await Task.Yield();

ctx.AmbientTransactionFoundAfterAwait = Transaction.Current != null;
}))
.Done(c => c.EndpointsStarted)
.Run();

Assert.True(result.AmbientTransactionFoundBeforeAwait, "The ambient transaction was not visible before the await");
Assert.True(result.AmbientTransactionFoundAfterAwait, "The ambient transaction was not visible after the await");
}

class Context : ScenarioContext, IInjectServiceProvider
{
public bool MessageReceived { get; set; }
public bool AmbientTransactionFoundBeforeAwait { get; set; }
public bool AmbientTransactionFoundAfterAwait { get; set; }
public bool CompleteMessageReceived { get; set; }
public bool MessageReceived { get; set; }
public IServiceProvider ServiceProvider { get; set; }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ namespace NServiceBus.AcceptanceTesting
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Extensibility;
using Outbox;
using TransactionalSession;

sealed class CustomTestingOutboxStorage : IOutboxStorage
{
Expand All @@ -26,6 +28,11 @@ public Task<OutboxMessage> Get(string messageId, ContextBag context, Cancellatio

public Task<IOutboxTransaction> BeginTransaction(ContextBag context, CancellationToken cancellationToken = default)
{
if (context.TryGet(out CustomTestingPersistenceOpenSessionOptions options) && options.UseTransactionScope)
{
_ = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled);
}

return Task.FromResult<IOutboxTransaction>(new CustomTestingOutboxTransaction(context));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@ public class CustomTestingPersistenceOpenSessionOptions : OpenSessionOptions
public CustomTestingPersistenceOpenSessionOptions() => Extensions.Set(this);

public TaskCompletionSource<bool> TransactionCommitTaskCompletionSource { get; set; }

public bool UseTransactionScope { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace NServiceBus.TransactionalSession
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -25,7 +26,20 @@ protected override async Task CommitInternal(CancellationToken cancellationToken

public override async Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default)
{
await base.Open(options, cancellationToken).ConfigureAwait(false);
ThrowIfDisposed();
ThrowIfCommitted();

if (IsOpen)
{
throw new InvalidOperationException($"This session is already open. {nameof(ITransactionalSession)}.{nameof(ITransactionalSession.Open)} should only be called once.");
}

this.options = options;

foreach (var customization in customizations)
{
customization.Apply(this.options);
}

await synchronizedStorageSession.Open(null, new TransportTransaction(), Context, cancellationToken).ConfigureAwait(false);
}
Expand Down
27 changes: 24 additions & 3 deletions src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,32 @@ static void SerializeRoutingStrategy(AddressTag addressTag, Dictionary<string, s
throw new Exception($"Unknown routing strategy {addressTag.GetType().FullName}");
}

public override async Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default)
public override Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default)
{
await base.Open(options, cancellationToken).ConfigureAwait(false);
ThrowIfDisposed();
ThrowIfCommitted();

outboxTransaction = await outboxStorage.BeginTransaction(Context, cancellationToken).ConfigureAwait(false);
if (IsOpen)
{
throw new InvalidOperationException($"This session is already open. {nameof(ITransactionalSession)}.{nameof(ITransactionalSession.Open)} should only be called once.");
}

this.options = options;

foreach (var customization in customizations)
{
customization.Apply(this.options);
}

// Unfortunately this is the only way to make it possible for Transaction.Current to float up to the caller
// to make sure SQLP and NHibernate work with the transaction scope
var outboxTransactionTask = outboxStorage.BeginTransaction(Context, cancellationToken);
return OpenInternal(outboxTransactionTask, cancellationToken);
}

async Task OpenInternal(Task<IOutboxTransaction> beginTransactionTask, CancellationToken cancellationToken)
{
outboxTransaction = await beginTransactionTask.ConfigureAwait(false);

if (!await synchronizedStorageSession.TryOpen(outboxTransaction, Context, cancellationToken).ConfigureAwait(false))
{
Expand Down
27 changes: 4 additions & 23 deletions src/NServiceBus.TransactionalSession/TransactionalSessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,10 @@ public async Task Commit(CancellationToken cancellationToken = default)
committed = true;
}

public abstract Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default);

protected abstract Task CommitInternal(CancellationToken cancellationToken = default);

public virtual Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
ThrowIfCommitted();

if (IsOpen)
{
throw new InvalidOperationException($"This session is already open. {nameof(ITransactionalSession)}.{nameof(ITransactionalSession.Open)} should only be called once.");
}

this.options = options;

foreach (var customization in customizations)
{
customization.Apply(this.options);
}

return Task.CompletedTask;
}

public async Task Send(object message, SendOptions sendOptions, CancellationToken cancellationToken = default)
{
ThrowIfInvalidState();
Expand Down Expand Up @@ -119,15 +100,15 @@ public async Task Publish<T>(Action<T> messageConstructor, PublishOptions publis
await messageSession.Publish(messageConstructor, publishOptions, cancellationToken).ConfigureAwait(false);
}

void ThrowIfDisposed()
protected void ThrowIfDisposed()
{
if (disposed)
{
throw new ObjectDisposedException(nameof(Dispose));
}
}

void ThrowIfCommitted()
protected void ThrowIfCommitted()
{
if (committed)
{
Expand Down Expand Up @@ -170,7 +151,7 @@ protected virtual void Dispose(bool disposing)

protected readonly ICompletableSynchronizedStorageSession synchronizedStorageSession;
protected readonly IMessageDispatcher dispatcher;
readonly IEnumerable<IOpenSessionOptionsCustomization> customizations;
protected readonly IEnumerable<IOpenSessionOptionsCustomization> customizations;
protected readonly PendingTransportOperations pendingOperations;
protected OpenSessionOptions options;
readonly IMessageSession messageSession;
Expand Down

0 comments on commit cb348a2

Please sign in to comment.