Skip to content

Commit

Permalink
ConsulWorkerIdGenerator Support HealthChecks
Browse files Browse the repository at this point in the history
  • Loading branch information
witskeeper committed Nov 18, 2023
1 parent 9a6855b commit 772edb5
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.targets
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<PackageReference Update="Microsoft.AspNetCore.Mvc.NewtonsoftJson" Version="$(FrameworkVersion)"/>
<PackageReference Update="Microsoft.AspNetCore.Authentication.Certificate" Version="$(FrameworkVersion)"/>
<PackageReference Update="Microsoft.AspNetCore.DataProtection.StackExchangeRedis" Version="$(FrameworkVersion)" />

<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="$(FrameworkVersion)"/>
<!--microsoft entity framework -->
<PackageReference Update="Microsoft.EntityFrameworkCore" Version="$(EntityFrameworkVersion)"/>
<PackageReference Update="Microsoft.EntityFrameworkCore.Relational" Version="$(EntityFrameworkVersion)"/>
Expand Down
57 changes: 46 additions & 11 deletions src/Snowflake.Consul/ConsulWorkerIdGenerator.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
using System.Globalization;
using Consul;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace NetCorePal.Extensions.Snowflake.Consul
{
public sealed class ConsulWorkerIdGenerator : BackgroundService, IWorkIdGenerator
public sealed class ConsulWorkerIdGenerator : BackgroundService, IWorkIdGenerator, IHealthCheck
{
private const string SessionName = "snowflake-workerid-session";
private readonly TimeSpan _sessionTtl;
Expand All @@ -16,6 +17,10 @@ public sealed class ConsulWorkerIdGenerator : BackgroundService, IWorkIdGenerato

private string _sessionId = string.Empty;


public bool IsHealth { get; private set; } = false;


// work id
private readonly long? _workId;

Expand Down Expand Up @@ -62,6 +67,7 @@ private async Task<long> GenWorkId()
{
if (await TryLockWorkId(_sessionId, i))
{
this.IsHealth = true;
return i;
}
}
Expand All @@ -72,18 +78,27 @@ private async Task<long> GenWorkId()

private async Task<bool> TryLockWorkId(string sessionId, long workId)
{
KVPair kvp = new(GetWorkerIdKey(workId))
try
{
Session = sessionId,
Value = System.Text.Encoding.UTF8.GetBytes(
$"{WorkId_Identity_Name},{DateTime.Now.ToString(CultureInfo.InvariantCulture)}")
};
_logger.LogInformation("尝试使用key: {Key} 获取锁", kvp.Key);
var result = await _consulClient.KV.Acquire(kvp);
if (result.Response)
KVPair kvp = new(GetWorkerIdKey(workId))
{
Session = sessionId,
Value = System.Text.Encoding.UTF8.GetBytes(
$"{WorkId_Identity_Name},{DateTime.Now.ToString(CultureInfo.InvariantCulture)}")
};
_logger.LogInformation("尝试使用key: {Key} 获取锁", kvp.Key);
var result = await _consulClient.KV.Acquire(kvp);
if (result.Response)
{
_logger.LogInformation("获取到workerId:{workerId}", workId);
return true;
}
}
catch (Exception e)
{
_logger.LogInformation("获取到workerId:{workerId}", workId);
return true;
_logger.LogError(e, "尝试获取workerId时出错");
this.IsHealth = false;
throw;
}

return false;
Expand All @@ -108,9 +123,11 @@ public async Task Refresh(CancellationToken stoppingToken = default)
_sessionId = await CreateSession();
if (!await TryLockWorkId(_sessionId, _workId.Value))

Check warning on line 124 in src/Snowflake.Consul/ConsulWorkerIdGenerator.cs

View workflow job for this annotation

GitHub Actions / build

Nullable value type may be null.

Check warning on line 124 in src/Snowflake.Consul/ConsulWorkerIdGenerator.cs

View workflow job for this annotation

GitHub Actions / build

Nullable value type may be null.

Check warning on line 124 in src/Snowflake.Consul/ConsulWorkerIdGenerator.cs

View workflow job for this annotation

GitHub Actions / build

Nullable value type may be null.

Check warning on line 124 in src/Snowflake.Consul/ConsulWorkerIdGenerator.cs

View workflow job for this annotation

GitHub Actions / build

Nullable value type may be null.

Check warning on line 124 in src/Snowflake.Consul/ConsulWorkerIdGenerator.cs

View workflow job for this annotation

GitHub Actions / build

Nullable value type may be null.

Check warning on line 124 in src/Snowflake.Consul/ConsulWorkerIdGenerator.cs

View workflow job for this annotation

GitHub Actions / build

Nullable value type may be null.

Check warning on line 124 in src/Snowflake.Consul/ConsulWorkerIdGenerator.cs

View workflow job for this annotation

GitHub Actions / build

Nullable value type may be null.

Check warning on line 124 in src/Snowflake.Consul/ConsulWorkerIdGenerator.cs

View workflow job for this annotation

GitHub Actions / build

Nullable value type may be null.
{
this.IsHealth = false;
throw new WorkerIdConflictException($"使用新会话{_sessionId}抢占workerId:{_workId}失败");
}
}

var result = await _consulClient.KV.Get(GetWorkerIdKey(), stoppingToken);
_logger.LogInformation("成功刷新会话,sessionId:{sessionId},值为 {value}, 当前workerid:{workId}.",
_sessionId, result.Response, _workId);
Expand Down Expand Up @@ -160,5 +177,23 @@ public override async Task StopAsync(CancellationToken cancellationToken)
}

#endregion

#region HealthCheck

public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
CancellationToken cancellationToken = new CancellationToken())
{
if (this.IsHealth)
{
return Task.FromResult(HealthCheckResult.Healthy());
}
else
{
return Task.FromResult(new HealthCheckResult(_options.UnhealthyStatus,
$"workerId: {_workId} 对应的consul key锁定失败"));
}
}

#endregion
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,12 @@ public static IServiceCollection AddConsulWorkerIdGenerator(this IServiceCollect
services.AddHostedService(p => p.GetRequiredService<ConsulWorkerIdGenerator>());
return services;
}

public static IServiceCollection AddConsulWorkerIdGeneratorHealthCheck(this IHealthChecksBuilder builder,
string name = "ConsulWorkerIdGenerator")
{
builder.AddCheck<ConsulWorkerIdGenerator>(name);
return builder.Services;
}
}
}
6 changes: 5 additions & 1 deletion src/Snowflake.Consul/ConsulWorkerIdGeneratorOptions.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
namespace NetCorePal.Extensions.Snowflake.Consul
using Microsoft.Extensions.Diagnostics.HealthChecks;

namespace NetCorePal.Extensions.Snowflake.Consul
{
public class ConsulWorkerIdGeneratorOptions
{
public string AppName { get; set; } = "myAppName";
public string ConsulKeyPrefix { get; set; } = string.Empty;
public int SessionTtlSeconds { get; set; } = 60;
public int SessionRefreshIntervalSeconds { get; set; } = 15;

public HealthStatus UnhealthyStatus { get; set; } = HealthStatus.Unhealthy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks"/>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Snowflake\NetCorePal.Extensions.Snowflake.csproj" />
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using Consul;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Testcontainers.Consul;
using HealthStatus = Microsoft.Extensions.Diagnostics.HealthChecks.HealthStatus;

namespace NetCorePal.Extensions.Snowflake.Consul.UnitTests;

Expand Down Expand Up @@ -32,6 +34,7 @@ public void GetId_Test()
[Fact]
public async Task Refresh_Throw_Exception_When_Session_Locked_By_Others_Test()
{
HealthCheckContext healthCheckContext = new HealthCheckContext();
var consulWorkerIdGenerator = CreateConsulWorkerIdGenerator(p =>
{
p.AppName = "timeout-app";
Expand All @@ -41,6 +44,8 @@ public async Task Refresh_Throw_Exception_When_Session_Locked_By_Others_Test()

var id = consulWorkerIdGenerator.GetId();
Assert.Equal(0, id);
Assert.True(consulWorkerIdGenerator.IsHealth);
Assert.Equal(HealthStatus.Healthy, consulWorkerIdGenerator.CheckHealthAsync(healthCheckContext).Result.Status);

var releaseResult = await _consulClient.KV.Release(new KVPair(consulWorkerIdGenerator.GetWorkerIdKey())
{
Expand All @@ -64,12 +69,61 @@ public async Task Refresh_Throw_Exception_When_Session_Locked_By_Others_Test()
Assert.Equal(0, consulWorkerIdGenerator2.GetId());

await Assert.ThrowsAsync<WorkerIdConflictException>(async () => await consulWorkerIdGenerator.Refresh());
Assert.False(consulWorkerIdGenerator.IsHealth);
Assert.Equal(HealthStatus.Unhealthy,
consulWorkerIdGenerator.CheckHealthAsync(healthCheckContext).Result.Status);
}


[Fact]
public async Task Refresh_Throw_Exception_When_Session_Locked_By_Others_And_UnhealthyStatus_Degraded_Test()
{
HealthCheckContext healthCheckContext = new HealthCheckContext();
var consulWorkerIdGenerator = CreateConsulWorkerIdGenerator(p =>
{
p.AppName = "timeout-app-degraded";
p.SessionTtlSeconds = 10;
p.SessionRefreshIntervalSeconds = 5;
p.UnhealthyStatus = HealthStatus.Degraded;
});

var id = consulWorkerIdGenerator.GetId();
Assert.Equal(0, id);
Assert.True(consulWorkerIdGenerator.IsHealth);
Assert.Equal(HealthStatus.Healthy, consulWorkerIdGenerator.CheckHealthAsync(healthCheckContext).Result.Status);

var releaseResult = await _consulClient.KV.Release(new KVPair(consulWorkerIdGenerator.GetWorkerIdKey())
{
Session = consulWorkerIdGenerator.CurrentSessionId
});
Assert.True(releaseResult.Response);

var destoryResult = await _consulClient.Session.Destroy(consulWorkerIdGenerator.CurrentSessionId);

Assert.True(destoryResult.Response);

var deleteResult = await _consulClient.KV.Delete(consulWorkerIdGenerator.GetWorkerIdKey());
Assert.True(deleteResult.Response);

var consulWorkerIdGenerator2 = CreateConsulWorkerIdGenerator(p =>
{
p.AppName = "timeout-app-degraded";
p.SessionTtlSeconds = 10;
p.SessionRefreshIntervalSeconds = 5;
});
Assert.Equal(0, consulWorkerIdGenerator2.GetId());

await Assert.ThrowsAsync<WorkerIdConflictException>(async () => await consulWorkerIdGenerator.Refresh());
Assert.False(consulWorkerIdGenerator.IsHealth);
Assert.Equal(HealthStatus.Degraded,
consulWorkerIdGenerator.CheckHealthAsync(healthCheckContext).Result.Status);
}


[Fact]
public async Task Refresh_OK_When_Session_Released_And_Not_Lock_By_Others_Test()
{
HealthCheckContext healthCheckContext = new HealthCheckContext();
var consulWorkerIdGenerator = CreateConsulWorkerIdGenerator(p =>
{
p.AppName = "timeout-app-nolock";
Expand All @@ -79,7 +133,8 @@ public async Task Refresh_OK_When_Session_Released_And_Not_Lock_By_Others_Test()

var id = consulWorkerIdGenerator.GetId();
Assert.Equal(0, id);

Assert.True(consulWorkerIdGenerator.IsHealth);
Assert.Equal(HealthStatus.Healthy, consulWorkerIdGenerator.CheckHealthAsync(healthCheckContext).Result.Status);

var releaseResult = await _consulClient.KV.Release(new KVPair(consulWorkerIdGenerator.GetWorkerIdKey())
{
Expand All @@ -103,6 +158,8 @@ public async Task Refresh_OK_When_Session_Released_And_Not_Lock_By_Others_Test()
p.SessionRefreshIntervalSeconds = 5;
});
Assert.Equal(1, consulWorkerIdGenerator2.GetId());
Assert.True(consulWorkerIdGenerator2.IsHealth);
Assert.Equal(HealthStatus.Healthy, consulWorkerIdGenerator2.CheckHealthAsync(healthCheckContext).Result.Status);
}

[Fact]
Expand Down

0 comments on commit 772edb5

Please sign in to comment.