Skip to content

Commit

Permalink
added sending functionality and updated docs
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonshave committed Jun 20, 2022
1 parent 8b27fba commit 4df50cb
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 15 deletions.
48 changes: 39 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ You will need to create an Azure Storage account in the Azure portal using a uni

## Message handling behavior

- Sending a message will automatically serialize the payload to `BinaryData`.
- Multiple messages are pulled when `ReceiveMessagesAsync<T>` is called.
- If your handler does not throw, messages are automatically removed from the queue otherwise the message is returned to the queue for delivery again.
- Deserialization uses the `System.Text.Json` deserialization behavior. This can be overridden by specifying your own `JsonSerializerOptions` as seen below.
- Automatic deserialization uses the `System.Text.Json` deserialization behavior. This can be overridden by specifying your own `JsonSerializerOptions` as seen below.
- You can 'peek' messages using `PeekMessages<T>` which returns a collection but doesn't remove them from the queue.

## Usage
## Configuration

1. Add the Nuget package `JasonShave.AzureStorage.QueueService` to your .NET project
2. Set your `ConnectionString` and `QueueName` properties in your [.NET User Secrets store](https://docs.microsoft.com/en-us/aspnet/core/security/app-secrets?view=aspnetcore-6.0&tabs=windows), `appsettings.json`, or anywhere your `IConfiguration` provider can look for the `QueueClientSettings`. For example:
Expand Down Expand Up @@ -48,15 +49,44 @@ You will need to create an Azure Storage account in the Azure portal using a uni
serializationOptions => serializationOptions.AllowTrailingCommas = true);
```

4. Inject the `IQueueService` interface and use as follows:
## Sending messages to an Azure storage account queue

The following example shows the .NET "Worker Service" template where the class uses the `IHostedService` interface to run a particular code block repeatedly. The application will send the payload to the queue every five seconds.

1. Inject the `IQueueService` interface and use as follows:

```csharp
public class Sender : IHostedService
{
private readonly IQueueService _queueService;

public Sender(IQueueService queueService) => _queueService = queueService;

public async Task StartAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var myMessage = new MyMessage("Test");
await _queueService.SendMessageAsync<MyMessage>(myMessage, cancellationToken);
Task.Delay(5000);
}
}
}
```

## Receiving and handling messages from an Azure storage account queue

The following example shows the .NET "Worker Service" template where the class uses the `IHostedService` interface to run a particular code block repeatedly. The application will receive the payload from the queue repeatedly.

1. Inject the `IQueueService` interface and use as follows:

```csharp
public class Worker : IHostedService
public class Receiver : IHostedService
{
private readonly IQueueService _queueService;
private readonly IMyMessageHandler _myMessageHandler; // see optional handler below

public Worker(IQueueService queueService, IMyMessageHandler myMessageHandler)
public Receiver(IQueueService queueService, IMyMessageHandler myMessageHandler)
{
_queueService = queueService;
_myMessageHandler = myMessageHandler;
Expand All @@ -75,9 +105,9 @@ You will need to create an Azure Storage account in the Azure portal using a uni
}
```

5. Create your own message handler (optional)
2. Create your own message handler (optional)

The `ReceiveMessagesAsync<T>` method has two `HandleAsync()` methods. The first one handles the `<T>` message type you specify, and the second handles an `Exception` type. These can be implemented as follows:
The library has a single `ReceiveMessagesAsync<T>` method which takes two function delegates which allow you to specify your own message handling and exception handling. The first one handles the `<T>` message type you specify, and the second handles an `Exception` type. These can be implemented as follows:

```csharp
public interface IMyMessageHandler
Expand All @@ -88,8 +118,8 @@ You will need to create an Azure Storage account in the Azure portal using a uni

public class MyMessageHandler : IMyMessageHandler
{
public async Task HandleAsync(MyMessage message) => // do work
public async Task HandleExceptionAsync(Exception exception) => // handle exception
public async Task HandleAsync(MyMessage message) => Console.WriteLine(message);
public async Task HandleExceptionAsync(Exception exception) => Console.WriteLine(exception);
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,10 @@ public JsonQueueMessageConverter(JsonSerializerOptions? serializerOptions = null
throw new DeserializationException(e.Message);
}
}

public BinaryData Convert<TInput>(TInput input)
{
var binaryData = new BinaryData(input);
return binaryData;
}
}
2 changes: 2 additions & 0 deletions src/AzureStorage.QueueService/Interfaces/IMessageConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ internal interface IMessageConverter
TOutput? Convert<TOutput>(BinaryData input);

TOutput? Convert<TOutput>(string input);

BinaryData Convert<TInput>(TInput input);
}
6 changes: 5 additions & 1 deletion src/AzureStorage.QueueService/Interfaces/IQueueService.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
namespace JasonShave.AzureStorage.QueueService.Interfaces;
using JasonShave.AzureStorage.QueueService.Models;

namespace JasonShave.AzureStorage.QueueService.Interfaces;

public interface IQueueService
{
Task<IEnumerable<TMessage>> PeekMessages<TMessage>(int numMessages, CancellationToken cancellationToken = default);

Task ReceiveMessagesAsync<TMessage>(Func<TMessage?, Task> handleMessage, Func<Exception, Task> handleException, CancellationToken cancellationToken = default)
where TMessage : class;

Task<SendResponse> SendMessageAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default);
}
3 changes: 3 additions & 0 deletions src/AzureStorage.QueueService/Models/SendResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace JasonShave.AzureStorage.QueueService.Models;

public record SendResponse(string Receipt, string MessageId);
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using JasonShave.AzureStorage.QueueService.Interfaces;
using JasonShave.AzureStorage.QueueService.Models;
using Microsoft.Extensions.Logging;

namespace JasonShave.AzureStorage.QueueService.Services;
Expand Down Expand Up @@ -68,4 +69,12 @@ async Task ProcessMessage(QueueMessage queueMessage)
}
}
}

public async Task<SendResponse> SendMessageAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default)
{
BinaryData binaryMessage = _queueMessageConverter.Convert(message);
SendReceipt response = await _queueClient.SendMessageAsync(binaryMessage, null, null, cancellationToken);

return new SendResponse(response.PopReceipt, response.MessageId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Azure;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using Castle.Components.DictionaryAdapter;
using FluentAssertions;
using JasonShave.AzureStorage.QueueService.Interfaces;
using JasonShave.AzureStorage.QueueService.Models;
Expand Down Expand Up @@ -90,21 +91,21 @@ public async Task Receive_Messages_Throws_Collection()
{
// arrange
var fixture = new Fixture();
Response mockResponse = Mock.Of<Response>();
var mockResponse = Mock.Of<Response>();
var queueMessage = QueuesModelFactory.QueueMessage("1", "2", "test_text", 1);
QueueMessage[] peekedMessages = { queueMessage };
var response = Response.FromValue(peekedMessages, mockResponse);

var queueClient = new Mock<QueueClient>(_queueClientSettings.ConnectionString, _queueClientSettings.QueueName);
queueClient.Setup(x => x.ReceiveMessagesAsync(CancellationToken.None)).ReturnsAsync(response);
queueClient.Setup(x => x.DeleteMessageAsync(It.IsAny<string>(), It.IsAny<string>(), CancellationToken.None));
var mockQueueClient = new Mock<QueueClient>(_queueClientSettings.ConnectionString, _queueClientSettings.QueueName);
mockQueueClient.Setup(x => x.ReceiveMessagesAsync(CancellationToken.None)).ReturnsAsync(response);
mockQueueClient.Setup(x => x.DeleteMessageAsync(It.IsAny<string>(), It.IsAny<string>(), CancellationToken.None));

var mockMessageConverter = new Mock<IMessageConverter>();
mockMessageConverter.Setup(x => x.Convert<TestObject>(It.IsAny<string>())).Returns(fixture.Create<TestObject>());

var mockLogger = new Mock<ILogger<AzureStorageQueueService>>();

var subject = new AzureStorageQueueService(mockMessageConverter.Object, queueClient.Object,
var subject = new AzureStorageQueueService(mockMessageConverter.Object, mockQueueClient.Object,
mockLogger.Object);

// act/assert
Expand All @@ -116,4 +117,31 @@ await subject.ReceiveMessagesAsync<TestObject>(
return Task.CompletedTask;
});
}

[Fact(DisplayName = "Can send message")]
public async Task Can_Send_Message()
{
// arrange
var fixture = new Fixture();
var testObject = fixture.Create<TestObject>();

var mockQueueClient = new Mock<QueueClient>(_queueClientSettings.ConnectionString, _queueClientSettings.QueueName);
var mockResponse = Mock.Of<Response>();
var mockLogger = new Mock<ILogger<AzureStorageQueueService>>();
var mockMessageConverter = new Mock<IMessageConverter>();

mockMessageConverter.Setup(x => x.Convert(testObject)).Returns(It.IsAny<BinaryData>());

SendReceipt sendReceipt =
QueuesModelFactory.SendReceipt("1", DateTimeOffset.Now, DateTimeOffset.Now, "2", DateTimeOffset.Now);
var response = Response.FromValue(sendReceipt, mockResponse);
mockQueueClient.Setup(x => x.SendMessageAsync(It.IsAny<BinaryData>(), null, null, CancellationToken.None)).ReturnsAsync(response);

var subject = new AzureStorageQueueService(mockMessageConverter.Object, mockQueueClient.Object,
mockLogger.Object);

// act/assert
await subject.Invoking(x => x.SendMessageAsync(testObject)).Should().NotThrowAsync();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,37 @@ public void Convert_Empty_Binary_Returns_Null()
// act/assert
subject.Invoking(x => x.Convert<TestObject>(string.Empty)).Should().Throw<DeserializationException>();
}

[Fact(DisplayName = "Convert object to BinaryData")]
public void Convert_Object_To_BinaryData()
{
// arrange
var fixture = new Fixture();
var testObject = fixture.Create<TestObject>();

var subject = new JsonQueueMessageConverter();

// act
BinaryData binaryData = subject.Convert(testObject);

// assert
binaryData.Should().BeOfType(typeof(BinaryData));
}

[Fact(DisplayName = "Convert from Object to BinaryData and back to Object")]
public void Convert_To_And_From()
{
// arrange
var fixture = new Fixture();
var testObject = fixture.Create<TestObject>();

var subject = new JsonQueueMessageConverter();

// act
BinaryData binaryData = subject.Convert(testObject);
TestObject? convertedTestObject = subject.Convert<TestObject>(binaryData);

// assert
testObject.Should().BeEquivalentTo(convertedTestObject);
}
}

0 comments on commit 4df50cb

Please sign in to comment.