Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement IdleDuration for RedisTokenBucketRateLimiter #157

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions src/RedisRateLimiting/Concurrency/RedisConcurrencyRateLimiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
Expand All @@ -10,6 +11,8 @@ namespace RedisRateLimiting
{
public class RedisConcurrencyRateLimiter<TKey> : RateLimiter
{
private static readonly double TickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency;

private readonly RedisConcurrencyManager _redisManager;
private readonly RedisConcurrencyRateLimiterOptions _options;
private readonly ConcurrentQueue<Request> _queue = new();
Expand All @@ -20,7 +23,12 @@ public class RedisConcurrencyRateLimiter<TKey> : RateLimiter

private readonly ConcurrencyLease FailedLease = new(false, null, null);

public override TimeSpan? IdleDuration => TimeSpan.Zero;
private int _activeRequestsCount;
private long _idleSince = Stopwatch.GetTimestamp();

public override TimeSpan? IdleDuration => Interlocked.CompareExchange(ref _activeRequestsCount, 0, 0) > 0
? null
: new TimeSpan((long)((Stopwatch.GetTimestamp() - _idleSince) * TickFrequency));

public RedisConcurrencyRateLimiter(TKey partitionKey, RedisConcurrencyRateLimiterOptions options)
{
Expand Down Expand Up @@ -64,14 +72,24 @@ public RedisConcurrencyRateLimiter(TKey partitionKey, RedisConcurrencyRateLimite
return _redisManager.GetStatistics();
}

protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
protected override async ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
{
_idleSince = Stopwatch.GetTimestamp();
if (permitCount > _options.PermitLimit)
{
throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, string.Format("{0} permit(s) exceeds the permit limit of {1}.", permitCount, _options.PermitLimit));
}

return AcquireAsyncCoreInternal(cancellationToken);
Interlocked.Increment(ref _activeRequestsCount);
try
{
return await AcquireAsyncCoreInternal(cancellationToken);
}
finally
{
Interlocked.Decrement(ref _activeRequestsCount);
_idleSince = Stopwatch.GetTimestamp();
}
}

protected override RateLimitLease AttemptAcquireCore(int permitCount)
Expand Down
22 changes: 20 additions & 2 deletions src/RedisRateLimiting/FixedWindow/RedisFixedWindowRateLimiter.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using RedisRateLimiting.Concurrency;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
Expand All @@ -9,12 +10,19 @@ namespace RedisRateLimiting
{
public class RedisFixedWindowRateLimiter<TKey> : RateLimiter
{
private static readonly double TickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency;

private readonly RedisFixedWindowManager _redisManager;
private readonly RedisFixedWindowRateLimiterOptions _options;

private readonly FixedWindowLease FailedLease = new(isAcquired: false, null);

public override TimeSpan? IdleDuration => TimeSpan.Zero;
private int _activeRequestsCount;
private long _idleSince = Stopwatch.GetTimestamp();

public override TimeSpan? IdleDuration => Interlocked.CompareExchange(ref _activeRequestsCount, 0, 0) > 0
? null
: new TimeSpan((long)((Stopwatch.GetTimestamp() - _idleSince) * TickFrequency));

public RedisFixedWindowRateLimiter(TKey partitionKey, RedisFixedWindowRateLimiterOptions options)
{
Expand Down Expand Up @@ -74,7 +82,17 @@ private async ValueTask<RateLimitLease> AcquireAsyncCoreInternal(int permitCount
Window = _options.Window,
};

var response = await _redisManager.TryAcquireLeaseAsync(permitCount);
RedisFixedWindowResponse response;
Interlocked.Increment(ref _activeRequestsCount);
try
{
response = await _redisManager.TryAcquireLeaseAsync(permitCount);
}
finally
{
Interlocked.Decrement(ref _activeRequestsCount);
_idleSince = Stopwatch.GetTimestamp();
}

leaseContext.Count = response.Count;
leaseContext.RetryAfter = response.RetryAfter;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using RedisRateLimiting.Concurrency;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
Expand All @@ -9,12 +10,19 @@ namespace RedisRateLimiting
{
public class RedisSlidingWindowRateLimiter<TKey> : RateLimiter
{
private static readonly double TickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency;

private readonly RedisSlidingWindowManager _redisManager;
private readonly RedisSlidingWindowRateLimiterOptions _options;

private readonly SlidingWindowLease FailedLease = new(isAcquired: false, null);

public override TimeSpan? IdleDuration => TimeSpan.Zero;
private int _activeRequestsCount;
private long _idleSince = Stopwatch.GetTimestamp();

public override TimeSpan? IdleDuration => Interlocked.CompareExchange(ref _activeRequestsCount, 0, 0) > 0
? null
: new TimeSpan((long)((Stopwatch.GetTimestamp() - _idleSince) * TickFrequency));

public RedisSlidingWindowRateLimiter(TKey partitionKey, RedisSlidingWindowRateLimiterOptions options)
{
Expand Down Expand Up @@ -50,14 +58,24 @@ public RedisSlidingWindowRateLimiter(TKey partitionKey, RedisSlidingWindowRateLi
return _redisManager.GetStatistics();
}

protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
protected override async ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
{
_idleSince = Stopwatch.GetTimestamp();
if (permitCount > _options.PermitLimit)
{
throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, string.Format("{0} permit(s) exceeds the permit limit of {1}.", permitCount, _options.PermitLimit));
}

return AcquireAsyncCoreInternal();
Interlocked.Increment(ref _activeRequestsCount);
try
{
return await AcquireAsyncCoreInternal();
}
finally
{
Interlocked.Decrement(ref _activeRequestsCount);
_idleSince = Stopwatch.GetTimestamp();
}
}

protected override RateLimitLease AttemptAcquireCore(int permitCount)
Expand Down
24 changes: 21 additions & 3 deletions src/RedisRateLimiting/TokenBucket/RedisTokenBucketRateLimiter.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using RedisRateLimiting.Concurrency;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
Expand All @@ -9,12 +10,19 @@ namespace RedisRateLimiting
{
public class RedisTokenBucketRateLimiter<TKey> : RateLimiter
{
private static readonly double TickFrequency = (double)TimeSpan.TicksPerSecond / Stopwatch.Frequency;

private readonly RedisTokenBucketManager _redisManager;
private readonly RedisTokenBucketRateLimiterOptions _options;

private readonly TokenBucketLease FailedLease = new(isAcquired: false, null);

public override TimeSpan? IdleDuration => TimeSpan.Zero;
private int _activeRequestsCount;
private long _idleSince = Stopwatch.GetTimestamp();

public override TimeSpan? IdleDuration => Interlocked.CompareExchange(ref _activeRequestsCount, 0, 0) > 0
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick q: why do we need to track the number of active requests? Can't we just rely on the _idleSince timestamp?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to ensure that during AcquireAsyncCoreInternal execution we will never report we are idle when we don't.

Theoretically if we have infrequent usage of that rate limiter and for some reason Redis connection is slow and request takes 10+ seconds, than we will report 10+ seconds to .Net and then there is a chance .Net will decide that our rate limiter instance is idle and needs to be disposed, even though we are waiting for AcquireAsyncCoreInternal to complete

Probably, that's an overkill and in reality Redis shouldn't be that slow, but I didn't want to make such assumptions for common library.

? null
: new TimeSpan((long)((Stopwatch.GetTimestamp() - _idleSince) * TickFrequency));

public RedisTokenBucketRateLimiter(TKey partitionKey, RedisTokenBucketRateLimiterOptions options)
{
Expand Down Expand Up @@ -55,14 +63,24 @@ public RedisTokenBucketRateLimiter(TKey partitionKey, RedisTokenBucketRateLimite
throw new NotImplementedException();
}

protected override ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
protected override async ValueTask<RateLimitLease> AcquireAsyncCore(int permitCount, CancellationToken cancellationToken)
{
_idleSince = Stopwatch.GetTimestamp();
if (permitCount > _options.TokenLimit)
{
throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, string.Format("{0} permit(s) exceeds the permit limit of {1}.", permitCount, _options.TokenLimit));
}

return AcquireAsyncCoreInternal(permitCount);
Interlocked.Increment(ref _activeRequestsCount);
try
{
return await AcquireAsyncCoreInternal(permitCount);
}
finally
{
Interlocked.Decrement(ref _activeRequestsCount);
_idleSince = Stopwatch.GetTimestamp();
}
}

protected override RateLimitLease AttemptAcquireCore(int permitCount)
Expand Down
20 changes: 20 additions & 0 deletions test/RedisRateLimiting.Tests/UnitTests/ConcurrencyUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,26 @@ public async Task GetPermitWhilePermitEmptyQueueNotEmptyGetsQueued()
using var lease3 = await wait3;
Assert.True(lease3.IsAcquired);
}

[Fact]
public async Task IdleDurationIsUpdated()
{
await using var limiter = new RedisConcurrencyRateLimiter<string>(
partitionKey: Guid.NewGuid().ToString(),
new RedisConcurrencyRateLimiterOptions
{
PermitLimit = 1,
QueueLimit = 1,
TryDequeuePeriod = TimeSpan.FromHours(1),
ConnectionMultiplexerFactory = Fixture.ConnectionMultiplexerFactory,
});
await Task.Delay(TimeSpan.FromMilliseconds(5));
Assert.NotEqual(TimeSpan.Zero, limiter.IdleDuration);

var previousIdleDuration = limiter.IdleDuration;
using var lease = await limiter.AcquireAsync();
Assert.True(limiter.IdleDuration < previousIdleDuration);
}

static internal void ForceDequeue(RedisConcurrencyRateLimiter<string> limiter)
{
Expand Down
19 changes: 19 additions & 0 deletions test/RedisRateLimiting.Tests/UnitTests/FixedWindowUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,24 @@ public async Task CanAcquireMultiplePermits()
using var lease3 = await limiter.AcquireAsync(permitCount: 2);
Assert.True(lease3.IsAcquired);
}

[Fact]
public async Task IdleDurationIsUpdated()
{
await using var limiter = new RedisFixedWindowRateLimiter<string>(
partitionKey: Guid.NewGuid().ToString(),
new RedisFixedWindowRateLimiterOptions
{
PermitLimit = 1,
Window = TimeSpan.FromMinutes(1),
ConnectionMultiplexerFactory = Fixture.ConnectionMultiplexerFactory,
});
await Task.Delay(TimeSpan.FromMilliseconds(5));
Assert.NotEqual(TimeSpan.Zero, limiter.IdleDuration);

var previousIdleDuration = limiter.IdleDuration;
using var lease = await limiter.AcquireAsync();
Assert.True(limiter.IdleDuration < previousIdleDuration);
}
}
}
19 changes: 19 additions & 0 deletions test/RedisRateLimiting.Tests/UnitTests/SlidingWindowUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,24 @@ public async Task CanAcquireAsyncResourceWithSmallWindow()
using var lease4 = await limiter.AcquireAsync();
Assert.False(lease4.IsAcquired);
}

[Fact]
public async Task IdleDurationIsUpdated()
{
await using var limiter = new RedisSlidingWindowRateLimiter<string>(
partitionKey: Guid.NewGuid().ToString(),
new RedisSlidingWindowRateLimiterOptions
{
PermitLimit = 1,
Window = TimeSpan.FromMilliseconds(600),
ConnectionMultiplexerFactory = Fixture.ConnectionMultiplexerFactory,
});
await Task.Delay(TimeSpan.FromMilliseconds(5));
Assert.NotEqual(TimeSpan.Zero, limiter.IdleDuration);

var previousIdleDuration = limiter.IdleDuration;
using var lease = await limiter.AcquireAsync();
Assert.True(limiter.IdleDuration < previousIdleDuration);
}
}
}
20 changes: 20 additions & 0 deletions test/RedisRateLimiting.Tests/UnitTests/TokenBucketUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,25 @@ public async Task CanAcquireMultiPermits()
using var lease3 = await limiter.AcquireAsync(1);
Assert.True(lease3.IsAcquired);
}

[Fact]
public async Task IdleDurationIsUpdated()
{
await using var limiter = new RedisTokenBucketRateLimiter<string>(
partitionKey: Guid.NewGuid().ToString(),
new RedisTokenBucketRateLimiterOptions
{
TokenLimit = 1,
TokensPerPeriod = 1,
ReplenishmentPeriod = TimeSpan.FromMinutes(1),
ConnectionMultiplexerFactory = Fixture.ConnectionMultiplexerFactory,
});
await Task.Delay(TimeSpan.FromMilliseconds(5));
Assert.NotEqual(TimeSpan.Zero, limiter.IdleDuration);

var previousIdleDuration = limiter.IdleDuration;
using var lease = await limiter.AcquireAsync();
Assert.True(limiter.IdleDuration < previousIdleDuration);
}
}
}
Loading