Skip to content

Commit

Permalink
Added new AdvisoryLock from Marten
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Aug 5, 2024
1 parent 3979d3e commit a4c595a
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 2 deletions.
8 changes: 8 additions & 0 deletions src/Weasel.Core/IAdvisoryLock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Weasel.Core;

internal interface IAdvisoryLock: IAsyncDisposable
{
bool HasLock(int lockId);
Task<bool> TryAttainLockAsync(int lockId, CancellationToken token);
Task ReleaseLockAsync(int lockId);
}
66 changes: 65 additions & 1 deletion src/Weasel.Postgresql.Tests/advisory_lock_usage.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
using Npgsql;
using Microsoft.Extensions.Logging.Abstractions;
using Npgsql;
using Shouldly;
using Weasel.Core;
using Weasel.Core.Migrations;
using Xunit;

namespace Weasel.Postgresql.Tests;

public class advisory_lock_usage
{


[Fact]
public async Task explicitly_release_global_session_locks()
{
Expand Down Expand Up @@ -133,3 +138,62 @@ public async Task tx_session_locks()
await tx3.RollbackAsync();
}
}

public class AdvisoryLockSpecs : IAsyncLifetime
{
private SimplePostgresqlDatabase _database;
private AdvisoryLock theLock;

public Task InitializeAsync()
{
_database = new SimplePostgresqlDatabase(NpgsqlDataSource.Create(ConnectionSource.ConnectionString));
theLock = new AdvisoryLock(_database, NullLogger.Instance);
return Task.CompletedTask;
}

public async Task DisposeAsync()
{
await theLock.DisposeAsync();
await _database.DisposeAsync();
}


[Fact]
public async Task explicitly_release_global_session_locks()
{
await using var conn2 = new NpgsqlConnection(ConnectionSource.ConnectionString);
await using var conn3 = new NpgsqlConnection(ConnectionSource.ConnectionString);

await conn2.OpenAsync();
await conn3.OpenAsync();

await theLock.TryAttainLockAsync(1, CancellationToken.None);

// Cannot get the lock here
(await conn2.TryGetGlobalLock(1)).Succeeded.ShouldBeFalse();

await theLock.ReleaseLockAsync(1);

for (var j = 0; j < 5; j++)
{
if ((await conn2.TryGetGlobalLock(1)).Succeeded) return;

await Task.Delay(250);
}

throw new Exception("Advisory lock was not released");
}

}

public class SimplePostgresqlDatabase: PostgresqlDatabase
{
public SimplePostgresqlDatabase(NpgsqlDataSource dataSource) : base(new DefaultMigrationLogger(), AutoCreate.CreateOrUpdate, new PostgresqlMigrator(), "Simple", dataSource)
{
}

public override IFeatureSchema[] BuildFeatureSchemas()
{
return Array.Empty<IFeatureSchema>();
}
}
114 changes: 114 additions & 0 deletions src/Weasel.Postgresql/AdvisoryLock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using System.Data;
using JasperFx.Core;
using Microsoft.Extensions.Logging;
using Npgsql;
using Weasel.Core;
using Weasel.Core.Migrations;

namespace Weasel.Postgresql;

internal class AdvisoryLock : IAdvisoryLock
{
private readonly PostgresqlDatabase _database;
private readonly ILogger _logger;
private NpgsqlConnection _conn;
private readonly List<int> _locks = new();

public AdvisoryLock(PostgresqlDatabase database, ILogger logger)
{
_database = database;
_logger = logger;
}

public bool HasLock(int lockId)
{
return _conn is not { State: ConnectionState.Closed } && _locks.Contains(lockId);
}

public async Task<bool> TryAttainLockAsync(int lockId, CancellationToken token)
{
if (_conn == null)
{
_conn = _database.CreateConnection();
await _conn.OpenAsync(token).ConfigureAwait(false);
}

if (_conn.State == ConnectionState.Closed)
{
try
{
await _conn.DisposeAsync().ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Error trying to clean up and restart an advisory lock connection");
}
finally
{
_conn = null;
}

return false;
}



var attained = await _conn.TryGetGlobalLock(lockId, cancellation: token).ConfigureAwait(false);
if (attained == AttainLockResult.Success)
{
_locks.Add(lockId);
return true;
}

return false;
}

public async Task ReleaseLockAsync(int lockId)
{
if (!_locks.Contains(lockId)) return;

if (_conn == null || _conn.State == ConnectionState.Closed)
{
_locks.Remove(lockId);
return;
}

var cancellation = new CancellationTokenSource();
cancellation.CancelAfter(1.Seconds());

await _conn.ReleaseGlobalLock(lockId, cancellation: cancellation.Token).ConfigureAwait(false);
_locks.Remove(lockId);

if (!_locks.Any())
{
await _conn.CloseAsync().ConfigureAwait(false);
await _conn.DisposeAsync().ConfigureAwait(false);
_conn = null;
}
}

public async ValueTask DisposeAsync()
{
if (_conn == null) return;

try
{
foreach (var i in _locks)
{
await _conn.ReleaseGlobalLock(i, CancellationToken.None).ConfigureAwait(false);
}

await _conn.CloseAsync().ConfigureAwait(false);
await _conn.DisposeAsync().ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Error trying to dispose of advisory locks for database {Identifier}",
_database.Identifier);
}
finally
{
await _conn.DisposeAsync().ConfigureAwait(false);
}
}
}
7 changes: 6 additions & 1 deletion src/Weasel.Postgresql/PostgresqlDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace Weasel.Postgresql;

public abstract class PostgresqlDatabase: DatabaseBase<NpgsqlConnection>
public abstract class PostgresqlDatabase: DatabaseBase<NpgsqlConnection>, IAsyncDisposable
{
protected PostgresqlDatabase(
IMigrationLogger logger,
Expand Down Expand Up @@ -58,4 +58,9 @@ public NpgsqlConnection CreateConnection(TargetSessionAttributes targetSessionAt

return base.CreateConnection();
}

public ValueTask DisposeAsync()
{
return DataSource.DisposeAsync();
}
}

0 comments on commit a4c595a

Please sign in to comment.