diff --git a/docs/content/user-guide/en/storage/postgresql.md b/docs/content/user-guide/en/storage/postgresql.md index 7835b7f3a..04a47fd18 100644 --- a/docs/content/user-guide/en/storage/postgresql.md +++ b/docs/content/user-guide/en/storage/postgresql.md @@ -32,10 +32,11 @@ public void ConfigureServices(IServiceCollection services) #### PostgreSqlOptions -NAME | DESCRIPTION | TYPE | DEFAULT -:---|:---|---|:--- -Schema | Database schema | string | cap -ConnectionString | Database connection string | string | +NAME | DESCRIPTION | TYPE | DEFAULT +:---|:---------------------------|----------------------|:--- +Schema | Database schema | string | cap +ConnectionString | Database connection string | string | +DataSource | [Data source](https://www.npgsql.org/doc/basic-usage.html#data-source) | [NpgsqlDataSource](https://www.npgsql.org/doc/api/Npgsql.NpgsqlDataSource.html) | ## Publish with transaction diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs index 225d2aa07..10b735f80 100644 --- a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs +++ b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs @@ -6,6 +6,7 @@ using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; +using Npgsql; // ReSharper disable once CheckNamespace namespace DotNetCore.CAP; @@ -15,7 +16,21 @@ public class PostgreSqlOptions : EFOptions /// /// Gets or sets the database's connection string that will be used to store database entities. /// + [Obsolete("Use .DataSource = NpgsqlDataSource.Create() for same behavior.")] public string ConnectionString { get; set; } = default!; + + /// + /// Gets or sets the Npgsql data source that will be used to store database entities. + /// + public NpgsqlDataSource? DataSource { get; set; } + + /// + /// Creates an Npgsql connection from the configured data source. + /// + internal NpgsqlConnection CreateConnection() + { + return DataSource != null ? DataSource.CreateConnection() : new NpgsqlConnection(ConnectionString); + } } internal class ConfigurePostgreSqlOptions : IConfigureOptions diff --git a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs index 43d0b879c..8a62956c1 100644 --- a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs @@ -50,7 +50,7 @@ public async Task AcquireLockAsync(string key, TimeSpan ttl, string instan { var sql = $"UPDATE {_lockName} SET \"Instance\"=@Instance,\"LastLockTime\"=@LastLockTime WHERE \"Key\"=@Key AND \"LastLockTime\" < @TTL;"; - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); object[] sqlParams = { @@ -67,7 +67,7 @@ public async Task ReleaseLockAsync(string key, string instance, CancellationToke { var sql = $"UPDATE {_lockName} SET \"Instance\"='',\"LastLockTime\"=@LastLockTime WHERE \"Key\"=@Key AND \"Instance\"=@Instance;"; - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); object[] sqlParams = { @@ -82,7 +82,7 @@ public async Task RenewLockAsync(string key, TimeSpan ttl, string instance, Canc { var sql = $"UPDATE {_lockName} SET \"LastLockTime\"=\"LastLockTime\"+interval '{ttl.TotalSeconds}' second WHERE \"Key\"=@Key AND \"Instance\"=@Instance;"; - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); object[] sqlParams = { @@ -96,7 +96,7 @@ public async Task ChangePublishStateToDelayedAsync(string[] ids) { var sql = $"UPDATE {_pubName} SET \"StatusName\"='{StatusName.Delayed}' WHERE \"Id\" IN ({string.Join(',', ids)});"; - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); await connection.ExecuteNonQueryAsync(sql).ConfigureAwait(false); } @@ -140,7 +140,7 @@ public async Task StoreMessageAsync(string name, Message content, if (transaction == null) { - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false); } @@ -205,7 +205,7 @@ public async Task StoreReceivedMessageAsync(string name, string g public async Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) { - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); return await connection.ExecuteNonQueryAsync( $"DELETE FROM {table} WHERE \"Id\" IN (SELECT \"Id\" FROM {table} WHERE \"ExpiresAt\" < @timeout AND (\"StatusName\"='{StatusName.Succeeded}' OR \"StatusName\"='{StatusName.Failed}') LIMIT @batchCount);", @@ -238,7 +238,7 @@ public async Task ScheduleMessagesOfDelayedAsync(Func @@ -296,7 +296,7 @@ private async Task ChangeMessageStateAsync(string tableName, MediumMessage messa } else { - await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); + await using var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false); } @@ -308,7 +308,7 @@ private async Task StoreReceivedMessage(object[] sqlParams) $"INSERT INTO {_recName}(\"Id\",\"Version\",\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" + $"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING \"Id\";"; - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false); } @@ -327,7 +327,7 @@ private async Task> GetMessagesOfNeedRetryAsync(strin new NpgsqlParameter("@Added", fourMinAgo) }; - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); var result = await connection.ExecuteReaderAsync(sql, async reader => { diff --git a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs index 5ef6a9287..e7c994508 100644 --- a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs @@ -61,7 +61,7 @@ SELECT COUNT(""Id"") FROM {_recName} WHERE ""StatusName"" = N'Failed' SELECT COUNT(""Id"") FROM {_pubName} WHERE ""StatusName"" = N'Delayed' ) AS ""PublishedDelayed"";"; - var connection = new NpgsqlConnection(_options.ConnectionString); + var connection = _options.CreateConnection(); await using var _ = connection.ConfigureAwait(false); var statistics = await connection.ExecuteReaderAsync(sql, async reader => { @@ -98,7 +98,7 @@ public async Task> GetMessagesAsync(MessageQueryDto var sqlQuery = $"select * from {tableName} where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit"; - var connection = new NpgsqlConnection(_options.ConnectionString); + var connection = _options.CreateConnection(); await using var _ = connection.ConfigureAwait(false); var count = await connection.ExecuteScalarAsync($"select count(1) from {tableName} where 1=1 {where}", @@ -182,7 +182,7 @@ private async ValueTask GetNumberOfMessage(string tableName, string statusN var sqlQuery = $"select count(\"Id\") from {tableName} where Lower(\"StatusName\") = Lower(@state)"; - var connection = new NpgsqlConnection(_options.ConnectionString); + var connection = _options.CreateConnection(); await using var _ = connection.ConfigureAwait(false); return await connection.ExecuteScalarAsync(sqlQuery, new NpgsqlParameter("@state", statusName)) .ConfigureAwait(false); @@ -227,7 +227,7 @@ group by to_char(""Added"", 'yyyy-MM-dd-HH') }; Dictionary valuesMap; - var connection = new NpgsqlConnection(_options.ConnectionString); + var connection = _options.CreateConnection(); await using (connection.ConfigureAwait(false)) { valuesMap = await connection.ExecuteReaderAsync(sqlQuery, async reader => @@ -264,7 +264,7 @@ group by to_char(""Added"", 'yyyy-MM-dd-HH') var sql = $@"SELECT ""Id"" AS ""DbId"", ""Content"", ""Added"", ""ExpiresAt"", ""Retries"" FROM {tableName} WHERE ""Id""={id} FOR UPDATE SKIP LOCKED"; - var connection = new NpgsqlConnection(_options.ConnectionString); + var connection = _options.CreateConnection(); await using var _ = connection.ConfigureAwait(false); var mediumMessage = await connection.ExecuteReaderAsync(sql, async reader => { diff --git a/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs index 2fd25aba9..580a26c37 100644 --- a/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IStorageInitializer.PostgreSql.cs @@ -46,7 +46,7 @@ public async Task InitializeAsync(CancellationToken cancellationToken) if (cancellationToken.IsCancellationRequested) return; var sql = CreateDbTablesScript(_options.Value.Schema); - var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var connection = _options.Value.CreateConnection(); await using var _ = connection.ConfigureAwait(false); object[] sqlParams = {