From 772edb5428618c6888b0e396ee09965e78a5af2f Mon Sep 17 00:00:00 2001 From: witskeeper Date: Sat, 18 Nov 2023 13:38:32 +0800 Subject: [PATCH] ConsulWorkerIdGenerator Support HealthChecks --- Directory.Build.targets | 2 +- .../ConsulWorkerIdGenerator.cs | 57 ++++++++++++++---- ...ConsulWorkerIdGeneratorBuilderExtension.cs | 7 +++ .../ConsulWorkerIdGeneratorOptions.cs | 6 +- ...CorePal.Extensions.Snowflake.Consul.csproj | 1 + .../ConsulWorkerIdGeneratorTests.cs | 59 ++++++++++++++++++- 6 files changed, 118 insertions(+), 14 deletions(-) diff --git a/Directory.Build.targets b/Directory.Build.targets index ce8de3e..9bde874 100644 --- a/Directory.Build.targets +++ b/Directory.Build.targets @@ -39,7 +39,7 @@ - + diff --git a/src/Snowflake.Consul/ConsulWorkerIdGenerator.cs b/src/Snowflake.Consul/ConsulWorkerIdGenerator.cs index c4cac7d..73c1c8a 100644 --- a/src/Snowflake.Consul/ConsulWorkerIdGenerator.cs +++ b/src/Snowflake.Consul/ConsulWorkerIdGenerator.cs @@ -1,12 +1,13 @@ using System.Globalization; using Consul; +using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace NetCorePal.Extensions.Snowflake.Consul { - public sealed class ConsulWorkerIdGenerator : BackgroundService, IWorkIdGenerator + public sealed class ConsulWorkerIdGenerator : BackgroundService, IWorkIdGenerator, IHealthCheck { private const string SessionName = "snowflake-workerid-session"; private readonly TimeSpan _sessionTtl; @@ -16,6 +17,10 @@ public sealed class ConsulWorkerIdGenerator : BackgroundService, IWorkIdGenerato private string _sessionId = string.Empty; + + public bool IsHealth { get; private set; } = false; + + // work id private readonly long? _workId; @@ -62,6 +67,7 @@ private async Task GenWorkId() { if (await TryLockWorkId(_sessionId, i)) { + this.IsHealth = true; return i; } } @@ -72,18 +78,27 @@ private async Task GenWorkId() private async Task TryLockWorkId(string sessionId, long workId) { - KVPair kvp = new(GetWorkerIdKey(workId)) + try { - Session = sessionId, - Value = System.Text.Encoding.UTF8.GetBytes( - $"{WorkId_Identity_Name},{DateTime.Now.ToString(CultureInfo.InvariantCulture)}") - }; - _logger.LogInformation("尝试使用key: {Key} 获取锁", kvp.Key); - var result = await _consulClient.KV.Acquire(kvp); - if (result.Response) + KVPair kvp = new(GetWorkerIdKey(workId)) + { + Session = sessionId, + Value = System.Text.Encoding.UTF8.GetBytes( + $"{WorkId_Identity_Name},{DateTime.Now.ToString(CultureInfo.InvariantCulture)}") + }; + _logger.LogInformation("尝试使用key: {Key} 获取锁", kvp.Key); + var result = await _consulClient.KV.Acquire(kvp); + if (result.Response) + { + _logger.LogInformation("获取到workerId:{workerId}", workId); + return true; + } + } + catch (Exception e) { - _logger.LogInformation("获取到workerId:{workerId}", workId); - return true; + _logger.LogError(e, "尝试获取workerId时出错"); + this.IsHealth = false; + throw; } return false; @@ -108,9 +123,11 @@ public async Task Refresh(CancellationToken stoppingToken = default) _sessionId = await CreateSession(); if (!await TryLockWorkId(_sessionId, _workId.Value)) { + this.IsHealth = false; throw new WorkerIdConflictException($"使用新会话{_sessionId}抢占workerId:{_workId}失败"); } } + var result = await _consulClient.KV.Get(GetWorkerIdKey(), stoppingToken); _logger.LogInformation("成功刷新会话,sessionId:{sessionId},值为 {value}, 当前workerid:{workId}.", _sessionId, result.Response, _workId); @@ -160,5 +177,23 @@ public override async Task StopAsync(CancellationToken cancellationToken) } #endregion + + #region HealthCheck + + public Task CheckHealthAsync(HealthCheckContext context, + CancellationToken cancellationToken = new CancellationToken()) + { + if (this.IsHealth) + { + return Task.FromResult(HealthCheckResult.Healthy()); + } + else + { + return Task.FromResult(new HealthCheckResult(_options.UnhealthyStatus, + $"workerId: {_workId} 对应的consul key锁定失败")); + } + } + + #endregion } } \ No newline at end of file diff --git a/src/Snowflake.Consul/ConsulWorkerIdGeneratorBuilderExtension.cs b/src/Snowflake.Consul/ConsulWorkerIdGeneratorBuilderExtension.cs index 5af0d44..cc5e00e 100644 --- a/src/Snowflake.Consul/ConsulWorkerIdGeneratorBuilderExtension.cs +++ b/src/Snowflake.Consul/ConsulWorkerIdGeneratorBuilderExtension.cs @@ -16,5 +16,12 @@ public static IServiceCollection AddConsulWorkerIdGenerator(this IServiceCollect services.AddHostedService(p => p.GetRequiredService()); return services; } + + public static IServiceCollection AddConsulWorkerIdGeneratorHealthCheck(this IHealthChecksBuilder builder, + string name = "ConsulWorkerIdGenerator") + { + builder.AddCheck(name); + return builder.Services; + } } } \ No newline at end of file diff --git a/src/Snowflake.Consul/ConsulWorkerIdGeneratorOptions.cs b/src/Snowflake.Consul/ConsulWorkerIdGeneratorOptions.cs index c9cded9..343bd72 100644 --- a/src/Snowflake.Consul/ConsulWorkerIdGeneratorOptions.cs +++ b/src/Snowflake.Consul/ConsulWorkerIdGeneratorOptions.cs @@ -1,4 +1,6 @@ -namespace NetCorePal.Extensions.Snowflake.Consul +using Microsoft.Extensions.Diagnostics.HealthChecks; + +namespace NetCorePal.Extensions.Snowflake.Consul { public class ConsulWorkerIdGeneratorOptions { @@ -6,5 +8,7 @@ public class ConsulWorkerIdGeneratorOptions public string ConsulKeyPrefix { get; set; } = string.Empty; public int SessionTtlSeconds { get; set; } = 60; public int SessionRefreshIntervalSeconds { get; set; } = 15; + + public HealthStatus UnhealthyStatus { get; set; } = HealthStatus.Unhealthy; } } \ No newline at end of file diff --git a/src/Snowflake.Consul/NetCorePal.Extensions.Snowflake.Consul.csproj b/src/Snowflake.Consul/NetCorePal.Extensions.Snowflake.Consul.csproj index ca5b3cc..aa5b16d 100644 --- a/src/Snowflake.Consul/NetCorePal.Extensions.Snowflake.Consul.csproj +++ b/src/Snowflake.Consul/NetCorePal.Extensions.Snowflake.Consul.csproj @@ -10,6 +10,7 @@ + diff --git a/test/NetCorePal.Extensions.Snowflake.Consul.UnitTests/ConsulWorkerIdGeneratorTests.cs b/test/NetCorePal.Extensions.Snowflake.Consul.UnitTests/ConsulWorkerIdGeneratorTests.cs index 9042554..1e943bc 100644 --- a/test/NetCorePal.Extensions.Snowflake.Consul.UnitTests/ConsulWorkerIdGeneratorTests.cs +++ b/test/NetCorePal.Extensions.Snowflake.Consul.UnitTests/ConsulWorkerIdGeneratorTests.cs @@ -1,5 +1,7 @@ using Consul; +using Microsoft.Extensions.Diagnostics.HealthChecks; using Testcontainers.Consul; +using HealthStatus = Microsoft.Extensions.Diagnostics.HealthChecks.HealthStatus; namespace NetCorePal.Extensions.Snowflake.Consul.UnitTests; @@ -32,6 +34,7 @@ public void GetId_Test() [Fact] public async Task Refresh_Throw_Exception_When_Session_Locked_By_Others_Test() { + HealthCheckContext healthCheckContext = new HealthCheckContext(); var consulWorkerIdGenerator = CreateConsulWorkerIdGenerator(p => { p.AppName = "timeout-app"; @@ -41,6 +44,8 @@ public async Task Refresh_Throw_Exception_When_Session_Locked_By_Others_Test() var id = consulWorkerIdGenerator.GetId(); Assert.Equal(0, id); + Assert.True(consulWorkerIdGenerator.IsHealth); + Assert.Equal(HealthStatus.Healthy, consulWorkerIdGenerator.CheckHealthAsync(healthCheckContext).Result.Status); var releaseResult = await _consulClient.KV.Release(new KVPair(consulWorkerIdGenerator.GetWorkerIdKey()) { @@ -64,12 +69,61 @@ public async Task Refresh_Throw_Exception_When_Session_Locked_By_Others_Test() Assert.Equal(0, consulWorkerIdGenerator2.GetId()); await Assert.ThrowsAsync(async () => await consulWorkerIdGenerator.Refresh()); + Assert.False(consulWorkerIdGenerator.IsHealth); + Assert.Equal(HealthStatus.Unhealthy, + consulWorkerIdGenerator.CheckHealthAsync(healthCheckContext).Result.Status); + } + + + [Fact] + public async Task Refresh_Throw_Exception_When_Session_Locked_By_Others_And_UnhealthyStatus_Degraded_Test() + { + HealthCheckContext healthCheckContext = new HealthCheckContext(); + var consulWorkerIdGenerator = CreateConsulWorkerIdGenerator(p => + { + p.AppName = "timeout-app-degraded"; + p.SessionTtlSeconds = 10; + p.SessionRefreshIntervalSeconds = 5; + p.UnhealthyStatus = HealthStatus.Degraded; + }); + + var id = consulWorkerIdGenerator.GetId(); + Assert.Equal(0, id); + Assert.True(consulWorkerIdGenerator.IsHealth); + Assert.Equal(HealthStatus.Healthy, consulWorkerIdGenerator.CheckHealthAsync(healthCheckContext).Result.Status); + + var releaseResult = await _consulClient.KV.Release(new KVPair(consulWorkerIdGenerator.GetWorkerIdKey()) + { + Session = consulWorkerIdGenerator.CurrentSessionId + }); + Assert.True(releaseResult.Response); + + var destoryResult = await _consulClient.Session.Destroy(consulWorkerIdGenerator.CurrentSessionId); + + Assert.True(destoryResult.Response); + + var deleteResult = await _consulClient.KV.Delete(consulWorkerIdGenerator.GetWorkerIdKey()); + Assert.True(deleteResult.Response); + + var consulWorkerIdGenerator2 = CreateConsulWorkerIdGenerator(p => + { + p.AppName = "timeout-app-degraded"; + p.SessionTtlSeconds = 10; + p.SessionRefreshIntervalSeconds = 5; + }); + Assert.Equal(0, consulWorkerIdGenerator2.GetId()); + + await Assert.ThrowsAsync(async () => await consulWorkerIdGenerator.Refresh()); + Assert.False(consulWorkerIdGenerator.IsHealth); + Assert.Equal(HealthStatus.Degraded, + consulWorkerIdGenerator.CheckHealthAsync(healthCheckContext).Result.Status); } [Fact] public async Task Refresh_OK_When_Session_Released_And_Not_Lock_By_Others_Test() { + HealthCheckContext healthCheckContext = new HealthCheckContext(); var consulWorkerIdGenerator = CreateConsulWorkerIdGenerator(p => { p.AppName = "timeout-app-nolock"; @@ -79,7 +133,8 @@ public async Task Refresh_OK_When_Session_Released_And_Not_Lock_By_Others_Test() var id = consulWorkerIdGenerator.GetId(); Assert.Equal(0, id); - + Assert.True(consulWorkerIdGenerator.IsHealth); + Assert.Equal(HealthStatus.Healthy, consulWorkerIdGenerator.CheckHealthAsync(healthCheckContext).Result.Status); var releaseResult = await _consulClient.KV.Release(new KVPair(consulWorkerIdGenerator.GetWorkerIdKey()) { @@ -103,6 +158,8 @@ public async Task Refresh_OK_When_Session_Released_And_Not_Lock_By_Others_Test() p.SessionRefreshIntervalSeconds = 5; }); Assert.Equal(1, consulWorkerIdGenerator2.GetId()); + Assert.True(consulWorkerIdGenerator2.IsHealth); + Assert.Equal(HealthStatus.Healthy, consulWorkerIdGenerator2.CheckHealthAsync(healthCheckContext).Result.Status); } [Fact]