Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure service remains healthy with rare messages. #974

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/Motor.Extensions.Hosting/Internal/BackgroundTaskQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public BackgroundTaskQueue(IMetricsFactory<BackgroundTaskQueue<T>>? metricsFacto
_elementsInQueue = metricsFactory?.CreateGauge("task_queue_enqueued_elements", "", false, "type")
?.WithLabels(typeof(T).Name);
_totalMessages = metricsFactory?.CreateCounter("total_messages", "", false, "type")?.WithLabels(typeof(T).Name);
LastDequeuedAt = DateTimeOffset.UtcNow;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd prefer to rather have this as the default value in line 59 instead of here as a new line in the constructor.

}

public Task<ProcessedMessageStatus> QueueBackgroundWorkItem(T item)
Expand All @@ -31,6 +32,11 @@ public Task<ProcessedMessageStatus> QueueBackgroundWorkItem(T item)

var taskCompletionStatus = new TaskCompletionSource<ProcessedMessageStatus>();

if (_workItems.IsEmpty)
{
// Simulate recent dequeue to avoid HealthCheck triggering immediately
LastDequeuedAt = DateTimeOffset.UtcNow;
}
_workItems.Enqueue(new QueueItem<T>(item, taskCompletionStatus));
_elementsInQueue?.Inc();
_totalMessages?.Inc();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,71 +1,125 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Options;
using Moq;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;
using Motor.Extensions.Hosting.HealthChecks;
using Motor.Extensions.Hosting.Internal;
using Motor.Extensions.TestUtilities;
using Xunit;

namespace Motor.Extensions.Hosting_UnitTest.HealthChecks;

public class MessageProcessingHealthCheckTest
{
private readonly TimeSpan _timeout = TimeSpan.FromMilliseconds(50);

[Fact]
public async void CheckHealthAsync_LastDequeuedAtExceededTimeoutRangeAndQueueNotEmpty_ServiceIsUnhealthy()
public async void CheckHealthAsync_QueueHasMessagesWithoutRecentAcknowledgement_ServiceIsUnhealthy()
{
var healthCheck = CreateHealthCheck(true, 10);
var queue = CreateEmptyQueue();
var healthCheck = CreateHealthCheck(queue);
queue.QueueBackgroundWorkItem(MotorCloudEvent.CreateTestCloudEvent<string>("message"));

Check warning on line 24 in test/Motor.Extensions.Hosting_UnitTest/HealthChecks/MessageProcessingHealthCheckTest.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check warning on line 24 in test/Motor.Extensions.Hosting_UnitTest/HealthChecks/MessageProcessingHealthCheckTest.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.
await Task.Delay(_timeout * 2);

var result = (await healthCheck.CheckHealthAsync(new HealthCheckContext())).Status;

Assert.Equal(HealthStatus.Unhealthy, result);
}

[Fact]
public async void CheckHealthAsync_LastDequeuedAtWithinTimeoutRangeAndQueueNotEmpty_ServiceIsHealthy()
public async void CheckHealthAsync_QueueHasMessagesButMessageWasRecentlyAcknowledged_ServiceIsHealthy()
{
var queue = CreateEmptyQueue();
var healthCheck = CreateHealthCheck(queue);
queue.QueueBackgroundWorkItem(MotorCloudEvent.CreateTestCloudEvent<string>("message0"));

Check warning on line 37 in test/Motor.Extensions.Hosting_UnitTest/HealthChecks/MessageProcessingHealthCheckTest.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check warning on line 37 in test/Motor.Extensions.Hosting_UnitTest/HealthChecks/MessageProcessingHealthCheckTest.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.
await queue.DequeueAsync(CancellationToken.None);
queue.QueueBackgroundWorkItem(MotorCloudEvent.CreateTestCloudEvent<string>("message1"));

Check warning on line 39 in test/Motor.Extensions.Hosting_UnitTest/HealthChecks/MessageProcessingHealthCheckTest.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check warning on line 39 in test/Motor.Extensions.Hosting_UnitTest/HealthChecks/MessageProcessingHealthCheckTest.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

var result = (await healthCheck.CheckHealthAsync(new HealthCheckContext())).Status;

Assert.Equal(HealthStatus.Healthy, result);
}

[Fact]
public async void CheckHealthAsync_QueueRemainsEmptyLongerThanTimeout_ServiceIsHealthy()
{
var queue = CreateEmptyQueue();
var healthCheck = CreateHealthCheck(queue);
await Task.Delay(_timeout * 2);

var result = (await healthCheck.CheckHealthAsync(new HealthCheckContext())).Status;

Assert.Equal(HealthStatus.Healthy, result);
}

[Fact]
public async void CheckHealthAsync_QueueBecomesEmptyLongerThanTimeout_ServiceIsHealthy()
{
var healthCheck = CreateHealthCheck(false, 10);
var queue = CreateEmptyQueue();
var healthCheck = CreateHealthCheck(queue);
queue.QueueBackgroundWorkItem(MotorCloudEvent.CreateTestCloudEvent<string>("message"));

Check warning on line 63 in test/Motor.Extensions.Hosting_UnitTest/HealthChecks/MessageProcessingHealthCheckTest.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check warning on line 63 in test/Motor.Extensions.Hosting_UnitTest/HealthChecks/MessageProcessingHealthCheckTest.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.
await queue.DequeueAsync(CancellationToken.None);
await Task.Delay(_timeout * 2);

var result = (await healthCheck.CheckHealthAsync(new HealthCheckContext())).Status;

Assert.Equal(HealthStatus.Healthy, result);
}

[Fact]
public async void CheckHealthAsync_LastDequeuedAtExceededTimeoutRangeAndQueueEmpty_ServiceIsHealthy()
public async void CheckHealthAsync_QueueIsEmptyAndMessageWasRecentlyAcknowledged_ServiceIsHealthy()
{
var healthCheck = CreateHealthCheck(true, 0);
var queue = CreateEmptyQueue();
var healthCheck = CreateHealthCheck(queue);
queue.QueueBackgroundWorkItem(MotorCloudEvent.CreateTestCloudEvent<string>("message"));

Check warning on line 77 in test/Motor.Extensions.Hosting_UnitTest/HealthChecks/MessageProcessingHealthCheckTest.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

Check warning on line 77 in test/Motor.Extensions.Hosting_UnitTest/HealthChecks/MessageProcessingHealthCheckTest.cs

View workflow job for this annotation

GitHub Actions / build

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.
await queue.DequeueAsync(CancellationToken.None);

var result = (await healthCheck.CheckHealthAsync(new HealthCheckContext())).Status;

Assert.Equal(HealthStatus.Healthy, result);
}

[Fact]
public async void CheckHealthAsync_LastDequeuedAtWithinTimeoutRangeAndQueueEmpty_ServiceIsHealthy()
public async void CheckHealthAsync_MessageAppearsInQueueAfterQueueHasBeenEmpty_ServiceIsHealthy()
{
var healthCheck = CreateHealthCheck(false, 0);
var queue = CreateEmptyQueue();
var healthCheck = CreateHealthCheck(queue);
await Task.Delay(_timeout * 2);
queue.QueueBackgroundWorkItem(MotorCloudEvent.CreateTestCloudEvent<string>("message"));

var result = (await healthCheck.CheckHealthAsync(new HealthCheckContext())).Status;

Assert.Equal(HealthStatus.Healthy, result);
}

private MessageProcessingHealthCheck<string> CreateHealthCheck(bool exceededMaxTimeSinceLastProcessedMessage,
int itemCount)
[Fact]
public async void CheckHealthAsync_MessageRemainsInQueueLongerThanTimeout_ServiceIsUnhealthy()
{
var queue = CreateEmptyQueue();
var healthCheck = CreateHealthCheck(queue);
queue.QueueBackgroundWorkItem(MotorCloudEvent.CreateTestCloudEvent<string>("message"));
await Task.Delay(_timeout * 2);

var result = (await healthCheck.CheckHealthAsync(new HealthCheckContext())).Status;

Assert.Equal(HealthStatus.Unhealthy, result);
}

private MessageProcessingHealthCheck<string> CreateHealthCheck(IBackgroundTaskQueue<MotorCloudEvent<string>> queue)
{
var maxTimeSinceLastProcessedMessage = TimeSpan.FromMilliseconds(100);
var config = new MessageProcessingOptions
{
MaxTimeSinceLastProcessedMessage = maxTimeSinceLastProcessedMessage
MaxTimeSinceLastProcessedMessage = _timeout
};
var queue = new Mock<IBackgroundTaskQueue<MotorCloudEvent<string>>>();
queue.Setup(q => q.LastDequeuedAt).Returns(exceededMaxTimeSinceLastProcessedMessage
? DateTimeOffset.UtcNow.Subtract(maxTimeSinceLastProcessedMessage + TimeSpan.FromMilliseconds(50))
: DateTimeOffset.UtcNow.Subtract(maxTimeSinceLastProcessedMessage - TimeSpan.FromMilliseconds(50)));
queue.Setup(q => q.ItemCount).Returns(itemCount);
return new MessageProcessingHealthCheck<string>(
Options.Create(config),
queue.Object);
return new MessageProcessingHealthCheck<string>(Options.Create(config), queue);
}

private IBackgroundTaskQueue<MotorCloudEvent<string>> CreateEmptyQueue()
{
var queue = new BackgroundTaskQueue<MotorCloudEvent<string>>(null);
return queue;
}
}
Loading