Skip to content

Commit

Permalink
(GH-36) Tenants should share the same blob storage for large messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Jericho committed Jan 29, 2024
1 parent 94e14ef commit 0573d39
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,17 @@ namespace WorkerRole1
};

// Replace the following samples with the queues you want to monitor
messagePump.AddQueue("myfirstqueue", "myfirstqueue-poison", TimeSpan.FromMinutes(1), 3);
messagePump.AddQueue("mysecondqueue", "mysecondqueue-poison", TimeSpan.FromMinutes(1), 3);
messagePump.AddQueue("mythirdqueue", "mythirdqueue-poison", TimeSpan.FromMinutes(1), 3);
messagePump.AddQueue("queue01", "queue01-poison", TimeSpan.FromMinutes(1), 3, "queue01-oversize-messages");
messagePump.AddQueue("queue02", "queue02-poison", TimeSpan.FromMinutes(1), 3, "queue02-oversize-messages");
messagePump.AddQueue("queue03", "queue03-poison", TimeSpan.FromMinutes(1), 3, "queue03-oversize-messages");

// Queues can share the same poison queue
messagePump.AddQueue("queue04", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "queue04-oversize-messages");
messagePump.AddQueue("queue05", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "queue05-oversize-messages");

// Queues can also share the same blob storage for messages that exceed the max size
messagePump.AddQueue("queue06", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob");
messagePump.AddQueue("queue07", "my-poison-queue", TimeSpan.FromMinutes(1), 3, "large-messages-blob");

// Start the message pump
await messagePump.StartAsync(cancellationToken);
Expand Down
16 changes: 13 additions & 3 deletions Source/Picton.Messaging/AsyncMessagePump.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using App.Metrics;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Queues;
using Microsoft.Extensions.Logging;
using Picton.Managers;
using Picton.Messaging.Utilities;
Expand Down Expand Up @@ -107,9 +109,10 @@ public AsyncMessagePump(MessagePumpOptions options, ILogger logger = null, IMetr
/// <param name="poisonQueueName">Optional. The name of the queue where poison messages are automatically moved.</param>
/// <param name="visibilityTimeout">Optional. Specifies the visibility timeout value. The default value is 30 seconds.</param>
/// <param name="maxDequeueCount">Optional. A nonzero integer value that specifies the number of time we try to process a message before giving up and declaring the message to be "poison". The default value is 3.</param>
public void AddQueue(string queueName, string poisonQueueName = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3)
/// <param name="oversizeMessagesBlobStorageName">Name of the blob storage where messages that exceed the maximum size for a queue message are stored.</param>
public void AddQueue(string queueName, string poisonQueueName = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, string oversizeMessagesBlobStorageName = null)
{
AddQueue(new QueueConfig(queueName, poisonQueueName, visibilityTimeout, maxDequeueCount));
AddQueue(new QueueConfig(queueName, poisonQueueName, visibilityTimeout, maxDequeueCount, oversizeMessagesBlobStorageName));
}

/// <summary>
Expand All @@ -121,7 +124,14 @@ public void AddQueue(QueueConfig queueConfig)
if (string.IsNullOrEmpty(queueConfig.QueueName)) throw new ArgumentNullException(nameof(queueConfig.QueueName));
if (queueConfig.MaxDequeueCount < 1) throw new ArgumentOutOfRangeException(nameof(queueConfig.MaxDequeueCount), "Number of retries must be greater than zero.");

var queueManager = new QueueManager(_messagePumpOptions.ConnectionString, queueConfig.QueueName, true, _messagePumpOptions.QueueClientOptions, _messagePumpOptions.BlobClientOptions);
// ----------------------------------------------------------------------------------------------------
// When Picton 9.2.0 is released, when can replace the following few lines with the following single line:
// var queueManager = new QueueManager(_messagePumpOptions.ConnectionString, queueConfig.QueueName, queueConfig.OversizedMessagesBlobStorageName, true, _messagePumpOptions.QueueClientOptions, _messagePumpOptions.BlobClientOptions);
var blobStorageName = string.IsNullOrEmpty(queueConfig.OversizedMessagesBlobStorageName) ? $"{queueConfig.QueueName}-oversize-messages" : queueConfig.OversizedMessagesBlobStorageName;
var blobClient = new BlobContainerClient(_messagePumpOptions.ConnectionString, blobStorageName, _messagePumpOptions.BlobClientOptions);
var queueClient = new QueueClient(_messagePumpOptions.ConnectionString, queueConfig.QueueName, _messagePumpOptions.QueueClientOptions);
var queueManager = new QueueManager(blobClient, queueClient, true);

var poisonQueueManager = string.IsNullOrEmpty(queueConfig.PoisonQueueName) ? null : new QueueManager(_messagePumpOptions.ConnectionString, queueConfig.PoisonQueueName, true, _messagePumpOptions.QueueClientOptions, _messagePumpOptions.BlobClientOptions);

AddQueue(queueManager, poisonQueueManager, queueConfig.VisibilityTimeout, queueConfig.MaxDequeueCount);
Expand Down
7 changes: 6 additions & 1 deletion Source/Picton.Messaging/AsyncMultiTenantMessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,12 @@ public async Task StartAsync(CancellationToken cancellationToken)
!queue.Name.Equals($"{_queueNamePrefix}-poison", StringComparison.OrdinalIgnoreCase))
{
// AddQueue will make sure to add the queue only if it's not already in the round-robin list of queues.
_messagePump.AddQueue(queue.Name, $"{_queueNamePrefix}-poison", _visibilityTimeout, _maxDequeueCount);
_messagePump.AddQueue(
queue.Name,
$"{_queueNamePrefix}-poison", // All tenants share the same "poison" queue
_visibilityTimeout,
_maxDequeueCount,
$"{_queueNamePrefix}-oversize-messages"); // All tenants share the same "oversize messages" blob storage
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions Source/Picton.Messaging/QueueConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ namespace Picton.Messaging
{
public record QueueConfig
{
public QueueConfig(string queueName, string poisonQueueName = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3)
public QueueConfig(string queueName, string poisonQueueName = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, string oversizedMessagesBlobStorageName = null)
{
QueueName = queueName ?? throw new ArgumentNullException(nameof(queueName));
PoisonQueueName = poisonQueueName;
VisibilityTimeout = visibilityTimeout;
MaxDequeueCount = maxDequeueCount;
OversizedMessagesBlobStorageName = oversizedMessagesBlobStorageName;
}

public string QueueName { get; set; }
public string QueueName { get; set; } = null;

public string PoisonQueueName { get; set; } = null;

public string OversizedMessagesBlobStorageName { get; set; } = null;

public TimeSpan? VisibilityTimeout { get; set; } = null;

public int MaxDequeueCount { get; set; } = 3;
Expand Down

0 comments on commit 0573d39

Please sign in to comment.