From 45f5063a3ea6c953fa247b09d4b5c84f6adc1343 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 5 Aug 2024 11:14:53 -0500 Subject: [PATCH] Adding AdvisoryLock for both Postgresql and Sql Server. Bumps to 7.7.0 --- Directory.Build.props | 2 +- src/Weasel.Core/IAdvisoryLock.cs | 2 +- .../advisory_lock_usage.cs | 46 +++++++ src/Weasel.SqlServer/AdvisoryLock.cs | 115 ++++++++++++++++++ src/Weasel.SqlServer/Weasel.SqlServer.csproj | 1 + 5 files changed, 164 insertions(+), 2 deletions(-) create mode 100644 src/Weasel.SqlServer/AdvisoryLock.cs diff --git a/Directory.Build.props b/Directory.Build.props index 62793b0..0279bdd 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,7 +1,7 @@ - 7.6.0 + 7.7.0 12.0 enable enable diff --git a/src/Weasel.Core/IAdvisoryLock.cs b/src/Weasel.Core/IAdvisoryLock.cs index 88477df..ceb6ace 100644 --- a/src/Weasel.Core/IAdvisoryLock.cs +++ b/src/Weasel.Core/IAdvisoryLock.cs @@ -1,6 +1,6 @@ namespace Weasel.Core; -internal interface IAdvisoryLock: IAsyncDisposable +public interface IAdvisoryLock: IAsyncDisposable { bool HasLock(int lockId); Task TryAttainLockAsync(int lockId, CancellationToken token); diff --git a/src/Weasel.SqlServer.Tests/advisory_lock_usage.cs b/src/Weasel.SqlServer.Tests/advisory_lock_usage.cs index 66b695e..d5b2ae6 100644 --- a/src/Weasel.SqlServer.Tests/advisory_lock_usage.cs +++ b/src/Weasel.SqlServer.Tests/advisory_lock_usage.cs @@ -1,4 +1,5 @@ using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging.Abstractions; using Shouldly; using Xunit; @@ -142,3 +143,48 @@ public async Task tx_session_locks() } } } + +public class AdvisoryLockSpecs : IAsyncLifetime +{ + private AdvisoryLock theLock; + + public Task InitializeAsync() + { + + theLock = new AdvisoryLock(() => new SqlConnection(ConnectionSource.ConnectionString), NullLogger.Instance, "Testing"); + return Task.CompletedTask; + } + + public async Task DisposeAsync() + { + await theLock.DisposeAsync(); + } + + + [Fact] + public async Task explicitly_release_global_session_locks() + { + await using var conn2 = new SqlConnection(ConnectionSource.ConnectionString); + await using var conn3 = new SqlConnection(ConnectionSource.ConnectionString); + + await conn2.OpenAsync(); + await conn3.OpenAsync(); + + await theLock.TryAttainLockAsync(1, CancellationToken.None); + + // Cannot get the lock here + (await conn2.TryGetGlobalLock(1.ToString())).ShouldBeFalse(); + + await theLock.ReleaseLockAsync(1); + + for (var j = 0; j < 5; j++) + { + if ((await conn2.TryGetGlobalLock(1.ToString()))) return; + + await Task.Delay(250); + } + + throw new Exception("Advisory lock was not released"); + } + +} diff --git a/src/Weasel.SqlServer/AdvisoryLock.cs b/src/Weasel.SqlServer/AdvisoryLock.cs new file mode 100644 index 0000000..4b7957b --- /dev/null +++ b/src/Weasel.SqlServer/AdvisoryLock.cs @@ -0,0 +1,115 @@ +using System.Data; +using JasperFx.Core; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging; +using Weasel.Core; + +namespace Weasel.SqlServer; + +internal class AdvisoryLock : IAdvisoryLock +{ + private readonly Func _source; + private readonly ILogger _logger; + private readonly string _databaseName; + private SqlConnection _conn; + private readonly List _locks = new(); + + public AdvisoryLock(Func source, ILogger logger, string databaseName) + { + _source = source; + _logger = logger; + _databaseName = databaseName; + } + + public bool HasLock(int lockId) + { + return _conn is not { State: ConnectionState.Closed } && _locks.Contains(lockId); + } + + public async Task TryAttainLockAsync(int lockId, CancellationToken token) + { + if (_conn == null) + { + _conn = _source(); + await _conn.OpenAsync(token).ConfigureAwait(false); + } + + if (_conn.State == ConnectionState.Closed) + { + try + { + await _conn.DisposeAsync().ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to clean up and restart an advisory lock connection"); + } + finally + { + _conn = null; + } + + return false; + } + + + + var attained = await _conn.TryGetGlobalLock(lockId.ToString(), cancellation: token).ConfigureAwait(false); + if (attained) + { + _locks.Add(lockId); + return true; + } + + return false; + } + + public async Task ReleaseLockAsync(int lockId) + { + if (!_locks.Contains(lockId)) return; + + if (_conn == null || _conn.State == ConnectionState.Closed) + { + _locks.Remove(lockId); + return; + } + + var cancellation = new CancellationTokenSource(); + cancellation.CancelAfter(1.Seconds()); + + await _conn.ReleaseGlobalLock(lockId.ToString(), cancellation: cancellation.Token).ConfigureAwait(false); + _locks.Remove(lockId); + + if (!_locks.Any()) + { + await _conn.CloseAsync().ConfigureAwait(false); + await _conn.DisposeAsync().ConfigureAwait(false); + _conn = null; + } + } + + public async ValueTask DisposeAsync() + { + if (_conn == null) return; + + try + { + foreach (var i in _locks) + { + await _conn.ReleaseGlobalLock(i.ToString(), CancellationToken.None).ConfigureAwait(false); + } + + await _conn.CloseAsync().ConfigureAwait(false); + await _conn.DisposeAsync().ConfigureAwait(false); + } + catch (Exception e) + { + _logger.LogError(e, "Error trying to dispose of advisory locks for database {Identifier}", + _databaseName); + } + finally + { + await _conn.DisposeAsync().ConfigureAwait(false); + } + } +} diff --git a/src/Weasel.SqlServer/Weasel.SqlServer.csproj b/src/Weasel.SqlServer/Weasel.SqlServer.csproj index a8f48c9..06a33fd 100644 --- a/src/Weasel.SqlServer/Weasel.SqlServer.csproj +++ b/src/Weasel.SqlServer/Weasel.SqlServer.csproj @@ -17,6 +17,7 @@ +