Skip to content

Commit

Permalink
System.Threading.RateLimiting de-queue canceled request (#77182)
Browse files Browse the repository at this point in the history
* Dequeue canceled request before making decision for processing queue’s requests
  • Loading branch information
Ali Khalili committed Nov 2, 2022
1 parent f84c1b5 commit fbc2057
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,17 @@ private void Release(int releaseCount)
? _queue.PeekHead()
: _queue.PeekTail();

if (_permitCount >= nextPendingRequest.Count)
// Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued.
// We just need to remove the item and let the next queued item be considered for completion.
if (nextPendingRequest.Tcs.Task.IsCompleted)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
nextPendingRequest.CancellationTokenRegistration.Dispose();
}
else if (_permitCount >= nextPendingRequest.Count)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,17 @@ private void ReplenishInternal(long nowTicks)
? _queue.PeekHead()
: _queue.PeekTail();

if (_permitCount >= nextPendingRequest.Count)
// Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued.
// We just need to remove the item and let the next queued item be considered for completion.
if (nextPendingRequest.Tcs.Task.IsCompleted)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
nextPendingRequest.CancellationTokenRegistration.Dispose();
}
else if (_permitCount >= nextPendingRequest.Count)
{
// Request can be fulfilled
nextPendingRequest =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,18 @@ private void ReplenishInternal(long nowTicks)
? _queue.PeekHead()
: _queue.PeekTail();

// Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued.
// We just need to remove the item and let the next queued item be considered for completion.
if (nextPendingRequest.Tcs.Task.IsCompleted)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
nextPendingRequest.CancellationTokenRegistration.Dispose();
}
// If we have enough permits after replenishing to serve the queued requests
if (_permitCount >= nextPendingRequest.Count)
else if (_permitCount >= nextPendingRequest.Count)
{
// Request can be fulfilled
nextPendingRequest =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,17 @@ private void ReplenishInternal(long nowTicks)
? queue.PeekHead()
: queue.PeekTail();

if (_tokenCount >= nextPendingRequest.Count)
// Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued.
// We just need to remove the item and let the next queued item be considered for completion.
if (nextPendingRequest.Tcs.Task.IsCompleted)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? queue.DequeueHead()
: queue.DequeueTail();
nextPendingRequest.CancellationTokenRegistration.Dispose();
}
else if (_tokenCount >= nextPendingRequest.Count)
{
// Request can be fulfilled
nextPendingRequest =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public abstract class BaseRateLimiterTests
[Fact]
public abstract Task CanCancelAcquireAsyncAfterQueuing();

[Fact]
public abstract Task CanFillQueueWithOldestFirstAfterCancelingFirstQueuedRequestManually();

[Fact]
public abstract Task CanCancelAcquireAsyncBeforeQueuing();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,39 @@ public override async Task CanCancelAcquireAsyncAfterQueuing()
Assert.Equal(1, limiter.GetStatistics()?.CurrentAvailablePermits);
}

[Fact]
public override async Task CanFillQueueWithOldestFirstAfterCancelingFirstQueuedRequestManually()
{
var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions
{
PermitLimit = 3,
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 3,
});

var lease = limiter.AttemptAcquire(2);
Assert.True(lease.IsAcquired);

var lease1 = limiter.AttemptAcquire(1);
Assert.True(lease1.IsAcquired);

var cts = new CancellationTokenSource();
var wait = limiter.AcquireAsync(3, cts.Token);
cts.Cancel();

var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
Assert.Equal(cts.Token, ex.CancellationToken);

var wait2 = limiter.AcquireAsync(2);

lease.Dispose();
lease = await wait2;
Assert.True(lease.IsAcquired);

Assert.Equal(0, limiter.GetStatistics().CurrentAvailablePermits);
Assert.Equal(0, limiter.GetStatistics().CurrentQueuedCount);
}

[Fact]
public override async Task CanCancelAcquireAsyncBeforeQueuing()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,41 @@ public override async Task CanCancelAcquireAsyncAfterQueuing()
Assert.Equal(1, limiter.GetStatistics().CurrentAvailablePermits);
}

[Fact]
public override async Task CanFillQueueWithOldestFirstAfterCancelingFirstQueuedRequestManually()
{
var limiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions
{
PermitLimit = 3,
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 3,
Window = TimeSpan.FromMilliseconds(2),
AutoReplenishment = false
});

var lease = limiter.AttemptAcquire(1);
Assert.True(lease.IsAcquired);

lease = limiter.AttemptAcquire(2);
Assert.True(lease.IsAcquired);

var cts = new CancellationTokenSource();
var wait = limiter.AcquireAsync(2, cts.Token);
cts.Cancel();

var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
Assert.Equal(cts.Token, ex.CancellationToken);

var wait2 = limiter.AcquireAsync(1);
Replenish(limiter, 2L);

lease = await wait2;
Assert.True(lease.IsAcquired);

Assert.Equal(2, limiter.GetStatistics().CurrentAvailablePermits);
Assert.Equal(0, limiter.GetStatistics().CurrentQueuedCount);
}

[Fact]
public override async Task CanCancelAcquireAsyncBeforeQueuing()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,43 @@ public override async Task CanCancelAcquireAsyncAfterQueuing()
Assert.Equal(0, limiter.GetStatistics().CurrentAvailablePermits);
}

[Fact]
public override async Task CanFillQueueWithOldestFirstAfterCancelingFirstQueuedRequestManually()
{
var limiter = new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions
{
PermitLimit = 3,
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 3,
Window = TimeSpan.FromMilliseconds(2),
SegmentsPerWindow = 2,
AutoReplenishment = false
});

var lease = limiter.AttemptAcquire(1);
Assert.True(lease.IsAcquired);
Replenish(limiter, 1L);

lease = limiter.AttemptAcquire(2);
Assert.True(lease.IsAcquired);

var cts = new CancellationTokenSource();
var wait = limiter.AcquireAsync(2, cts.Token);
cts.Cancel();

var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
Assert.Equal(cts.Token, ex.CancellationToken);

var wait2 = limiter.AcquireAsync(1);
Replenish(limiter, 1L);

lease = await wait2;
Assert.True(lease.IsAcquired);

Assert.Equal(0, limiter.GetStatistics().CurrentAvailablePermits);
Assert.Equal(0, limiter.GetStatistics().CurrentQueuedCount);
}

[Fact]
public override async Task CanCancelAcquireAsyncBeforeQueuing()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,39 @@ public override async Task CanCancelAcquireAsyncAfterQueuing()
Assert.Equal(1, limiter.GetStatistics().CurrentAvailablePermits);
}

[Fact]
public override async Task CanFillQueueWithOldestFirstAfterCancelingFirstQueuedRequestManually()
{
var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions
{
TokenLimit = 3,
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 3,
ReplenishmentPeriod = TimeSpan.FromMilliseconds(3),
TokensPerPeriod = 3,
AutoReplenishment = false
});

var lease = limiter.AttemptAcquire(3);
Assert.True(lease.IsAcquired);

var cts = new CancellationTokenSource();
var wait = limiter.AcquireAsync(2, cts.Token);
cts.Cancel();

var ex = await Assert.ThrowsAsync<TaskCanceledException>(() => wait.AsTask());
Assert.Equal(cts.Token, ex.CancellationToken);

var wait2 = limiter.AcquireAsync(1);
Replenish(limiter, 1L);

lease = await wait2;
Assert.True(lease.IsAcquired);

Assert.Equal(0, limiter.GetStatistics().CurrentAvailablePermits);
Assert.Equal(0, limiter.GetStatistics().CurrentQueuedCount);
}

[Fact]
public override async Task CanFillQueueWithNewestFirstAfterCancelingQueuedRequestWithAnotherQueuedRequest()
{
Expand Down

0 comments on commit fbc2057

Please sign in to comment.