Skip to content

Commit

Permalink
Adding AdvisoryLock for both Postgresql and Sql Server. Bumps to 7.7.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Aug 5, 2024
1 parent a4c595a commit b304e91
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<Project>
<PropertyGroup>
<Version>7.6.0</Version>
<Version>7.7.0</Version>
<LangVersion>12.0</LangVersion>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
Expand Down
2 changes: 1 addition & 1 deletion src/Weasel.Core/IAdvisoryLock.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Weasel.Core;

internal interface IAdvisoryLock: IAsyncDisposable
public interface IAdvisoryLock: IAsyncDisposable
{
bool HasLock(int lockId);
Task<bool> TryAttainLockAsync(int lockId, CancellationToken token);
Expand Down
8 changes: 4 additions & 4 deletions src/Weasel.Postgresql.Tests/advisory_lock_usage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,16 @@ public async Task explicitly_release_global_session_locks()
await conn2.OpenAsync();
await conn3.OpenAsync();

await theLock.TryAttainLockAsync(1, CancellationToken.None);
await theLock.TryAttainLockAsync(10, CancellationToken.None);

// Cannot get the lock here
(await conn2.TryGetGlobalLock(1)).Succeeded.ShouldBeFalse();
(await conn2.TryGetGlobalLock(10)).Succeeded.ShouldBeFalse();

await theLock.ReleaseLockAsync(1);
await theLock.ReleaseLockAsync(10);

for (var j = 0; j < 5; j++)
{
if ((await conn2.TryGetGlobalLock(1)).Succeeded) return;
if ((await conn2.TryGetGlobalLock(10)).Succeeded) return;

await Task.Delay(250);
}
Expand Down
46 changes: 46 additions & 0 deletions src/Weasel.SqlServer.Tests/advisory_lock_usage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;

Expand Down Expand Up @@ -142,3 +143,48 @@ public async Task tx_session_locks()
}
}
}

public class AdvisoryLockSpecs : IAsyncLifetime
{
private AdvisoryLock theLock;

public Task InitializeAsync()
{

theLock = new AdvisoryLock(() => new SqlConnection(ConnectionSource.ConnectionString), NullLogger.Instance, "Testing");
return Task.CompletedTask;
}

public async Task DisposeAsync()
{
await theLock.DisposeAsync();
}


[Fact]
public async Task explicitly_release_global_session_locks()
{
await using var conn2 = new SqlConnection(ConnectionSource.ConnectionString);
await using var conn3 = new SqlConnection(ConnectionSource.ConnectionString);

await conn2.OpenAsync();
await conn3.OpenAsync();

await theLock.TryAttainLockAsync(10, CancellationToken.None);

// Cannot get the lock here
(await conn2.TryGetGlobalLock(10.ToString())).ShouldBeFalse();

await theLock.ReleaseLockAsync(10);

for (var j = 0; j < 5; j++)
{
if ((await conn2.TryGetGlobalLock(10.ToString()))) return;

await Task.Delay(250);
}

throw new Exception("Advisory lock was not released");
}

}
115 changes: 115 additions & 0 deletions src/Weasel.SqlServer/AdvisoryLock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
using System.Data;
using JasperFx.Core;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using Weasel.Core;

namespace Weasel.SqlServer;

internal class AdvisoryLock : IAdvisoryLock
{
private readonly Func<SqlConnection> _source;
private readonly ILogger _logger;
private readonly string _databaseName;
private SqlConnection _conn;
private readonly List<int> _locks = new();

public AdvisoryLock(Func<SqlConnection> source, ILogger logger, string databaseName)
{
_source = source;
_logger = logger;
_databaseName = databaseName;
}

public bool HasLock(int lockId)
{
return _conn is not { State: ConnectionState.Closed } && _locks.Contains(lockId);
}

public async Task<bool> TryAttainLockAsync(int lockId, CancellationToken token)
{
if (_conn == null)
{
_conn = _source();
await _conn.OpenAsync(token).ConfigureAwait(false);
}

if (_conn.State == ConnectionState.Closed)
{
try
{
await _conn.DisposeAsync().ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Error trying to clean up and restart an advisory lock connection");
}
finally
{
_conn = null;
}

return false;
}



var attained = await _conn.TryGetGlobalLock(lockId.ToString(), cancellation: token).ConfigureAwait(false);
if (attained)
{
_locks.Add(lockId);
return true;
}

return false;
}

public async Task ReleaseLockAsync(int lockId)
{
if (!_locks.Contains(lockId)) return;

if (_conn == null || _conn.State == ConnectionState.Closed)
{
_locks.Remove(lockId);
return;
}

var cancellation = new CancellationTokenSource();
cancellation.CancelAfter(1.Seconds());

await _conn.ReleaseGlobalLock(lockId.ToString(), cancellation: cancellation.Token).ConfigureAwait(false);
_locks.Remove(lockId);

if (!_locks.Any())
{
await _conn.CloseAsync().ConfigureAwait(false);
await _conn.DisposeAsync().ConfigureAwait(false);
_conn = null;
}
}

public async ValueTask DisposeAsync()
{
if (_conn == null) return;

try
{
foreach (var i in _locks)
{
await _conn.ReleaseGlobalLock(i.ToString(), CancellationToken.None).ConfigureAwait(false);
}

await _conn.CloseAsync().ConfigureAwait(false);
await _conn.DisposeAsync().ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Error trying to dispose of advisory locks for database {Identifier}",
_databaseName);
}
finally
{
await _conn.DisposeAsync().ConfigureAwait(false);
}
}
}
1 change: 1 addition & 0 deletions src/Weasel.SqlServer/Weasel.SqlServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Data.SqlClient" Version="5.1.3" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit b304e91

Please sign in to comment.