Skip to content

Commit

Permalink
* Make handling of server-originated Basic.Cancel async to try to a…
Browse files Browse the repository at this point in the history
…ddress this failure - https://github.com/rabbitmq/rabbitmq-dotnet-client/actions/runs/8083493731/job/22086889719

* Make more code async

* Fix when `Dispose` is called for subclasses of `AsyncRpcContinuation`

I'm not exactly sure why this test sometimes fails 🤔
  • Loading branch information
lukebakken committed Mar 5, 2024
1 parent de8e1e4 commit 0682db6
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 129 deletions.
47 changes: 27 additions & 20 deletions projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs
@@ -1,5 +1,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using RabbitMQ.Client;
using RabbitMQ.Client.ConsumerDispatching;
Expand All @@ -8,7 +9,7 @@ namespace RabbitMQ.Benchmarks
{
[Config(typeof(Config))]
[BenchmarkCategory("ConsumerDispatcher")]
internal class ConsumerDispatcherBase
public class ConsumerDispatcherBase
{
protected static readonly ManualResetEventSlim _autoResetEvent = new ManualResetEventSlim(false);

Expand All @@ -19,18 +20,16 @@ internal class ConsumerDispatcherBase
protected readonly string _exchange = "Exchange";
protected readonly string _routingKey = "RoutingKey";
protected readonly ReadOnlyBasicProperties _properties = new ReadOnlyBasicProperties();
protected readonly RentedMemory _body;
protected readonly byte[] _body = new byte[512];

public ConsumerDispatcherBase()
{
var r = new Random();
byte[] body = new byte[512];
r.NextBytes(body);
_body = new RentedMemory(body);
r.NextBytes(_body);
}
}

internal class BasicDeliverConsumerDispatching : ConsumerDispatcherBase
public class BasicDeliverConsumerDispatching : ConsumerDispatcherBase
{
[Params(1, 30)]
public int Count { get; set; }
Expand All @@ -39,41 +38,49 @@ internal class BasicDeliverConsumerDispatching : ConsumerDispatcherBase
public int Concurrency { get; set; }

[GlobalSetup(Target = nameof(AsyncConsumerDispatcher))]
public void SetUpAsyncConsumer()
public async Task SetUpAsyncConsumer()
{
_consumer.Count = Count;
_dispatcher = new AsyncConsumerDispatcher(null, Concurrency);
_dispatcher.HandleBasicConsumeOk(_consumer, _consumerTag);
await _dispatcher.HandleBasicConsumeOkAsync(_consumer, _consumerTag, CancellationToken.None);
}

[Benchmark]
public void AsyncConsumerDispatcher()
public async Task AsyncConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
using (RentedMemory body = new RentedMemory(_body))
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body);
for (int i = 0; i < Count; i++)
{
await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, body,
CancellationToken.None);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
}

[GlobalSetup(Target = nameof(ConsumerDispatcher))]
public void SetUpConsumer()
public async Task SetUpConsumer()
{
_consumer.Count = Count;
_dispatcher = new ConsumerDispatcher(null, Concurrency);
_dispatcher.HandleBasicConsumeOk(_consumer, _consumerTag);
await _dispatcher.HandleBasicConsumeOkAsync(_consumer, _consumerTag, CancellationToken.None);
}

[Benchmark]
public void ConsumerDispatcher()
public async Task ConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
using (RentedMemory body = new RentedMemory(_body))
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body);
for (int i = 0; i < Count; i++)
{
await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, body,
CancellationToken.None);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
}
}
}
20 changes: 10 additions & 10 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Expand Up @@ -78,13 +78,13 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
{
case ProtocolCommandId.BasicCancel:
{
HandleBasicCancel(in cmd);
return Task.FromResult(true);
// Note: always returns true
return HandleBasicCancelAsync(cmd, cancellationToken);
}
case ProtocolCommandId.BasicDeliver:
{
HandleBasicDeliver(in cmd);
return Task.FromResult(true);
// Note: always returns true
return HandleBasicDeliverAsync(cmd, cancellationToken);
}
case ProtocolCommandId.BasicAck:
{
Expand All @@ -108,8 +108,8 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
}
case ProtocolCommandId.ChannelCloseOk:
{
HandleChannelCloseOk(in cmd);
return Task.FromResult(true);
// Note: always returns true
return HandleChannelCloseOkAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ChannelFlow:
{
Expand All @@ -128,8 +128,8 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
}
case ProtocolCommandId.ConnectionSecure:
{
HandleConnectionSecure(in cmd);
return Task.FromResult(true);
// Note: always returns true
return HandleConnectionSecureAsync(cmd);
}
case ProtocolCommandId.ConnectionStart:
{
Expand All @@ -138,8 +138,8 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
}
case ProtocolCommandId.ConnectionTune:
{
HandleConnectionTune(in cmd);
return Task.FromResult(true);
// Note: always returns true
return HandleConnectionTuneAsync(cmd);
}
case ProtocolCommandId.ConnectionUnblocked:
{
Expand Down
38 changes: 26 additions & 12 deletions projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs
Expand Up @@ -41,7 +41,7 @@

namespace RabbitMQ.Client.Impl
{
internal abstract class AsyncRpcContinuation<T> : IRpcContinuation, IDisposable
internal abstract class AsyncRpcContinuation<T> : IRpcContinuation
{
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly CancellationTokenRegistration _cancellationTokenRegistration;
Expand Down Expand Up @@ -101,7 +101,7 @@ public ConfiguredTaskAwaitable<T>.ConfiguredTaskAwaiter GetAwaiter()
return _tcsConfiguredTaskAwaitable.GetAwaiter();
}

public abstract void HandleCommand(in IncomingCommand cmd);
public abstract Task HandleCommandAsync(IncomingCommand cmd);

public virtual void HandleChannelShutdown(ShutdownEventArgs reason)
{
Expand Down Expand Up @@ -135,7 +135,7 @@ public ConnectionSecureOrTuneAsyncRpcContinuation(TimeSpan continuationTimeout)
{
}

public override void HandleCommand(in IncomingCommand cmd)
public override Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
Expand All @@ -156,6 +156,8 @@ public override void HandleCommand(in IncomingCommand cmd)
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}

return Task.CompletedTask;
}
finally
{
Expand All @@ -173,7 +175,7 @@ public SimpleAsyncRpcContinuation(ProtocolCommandId expectedCommandId, TimeSpan
_expectedCommandId = expectedCommandId;
}

public override void HandleCommand(in IncomingCommand cmd)
public override Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
Expand All @@ -185,6 +187,8 @@ public override void HandleCommand(in IncomingCommand cmd)
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}

return Task.CompletedTask;
}
finally
{
Expand All @@ -205,7 +209,7 @@ public BasicCancelAsyncRpcContinuation(string consumerTag, IConsumerDispatcher c
_consumerDispatcher = consumerDispatcher;
}

public override void HandleCommand(in IncomingCommand cmd)
public override async Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
Expand All @@ -214,7 +218,8 @@ public override void HandleCommand(in IncomingCommand cmd)
var method = new Client.Framing.Impl.BasicCancelOk(cmd.MethodSpan);
_tcs.TrySetResult(true);
Debug.Assert(_consumerTag == method._consumerTag);
_consumerDispatcher.HandleBasicCancelOk(_consumerTag);
await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationToken)
.ConfigureAwait(false);
}
else
{
Expand All @@ -240,15 +245,16 @@ public BasicConsumeAsyncRpcContinuation(IBasicConsumer consumer, IConsumerDispat
_consumerDispatcher = consumerDispatcher;
}

public override void HandleCommand(in IncomingCommand cmd)
public override async Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
if (cmd.CommandId == ProtocolCommandId.BasicConsumeOk)
{
var method = new Client.Framing.Impl.BasicConsumeOk(cmd.MethodSpan);
_tcs.TrySetResult(method._consumerTag);
_consumerDispatcher.HandleBasicConsumeOk(_consumer, method._consumerTag);
await _consumerDispatcher.HandleBasicConsumeOkAsync(_consumer, method._consumerTag, CancellationToken)
.ConfigureAwait(false);
}
else
{
Expand All @@ -272,7 +278,7 @@ public BasicGetAsyncRpcContinuation(Func<ulong, ulong> adjustDeliveryTag, TimeSp
_adjustDeliveryTag = adjustDeliveryTag;
}

public override void HandleCommand(in IncomingCommand cmd)
public override Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
Expand Down Expand Up @@ -300,6 +306,8 @@ public override void HandleCommand(in IncomingCommand cmd)
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}

return Task.CompletedTask;
}
finally
{
Expand Down Expand Up @@ -389,7 +397,7 @@ public QueueDeclareAsyncRpcContinuation(TimeSpan continuationTimeout) : base(con
{
}

public override void HandleCommand(in IncomingCommand cmd)
public override Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
Expand All @@ -403,6 +411,8 @@ public override void HandleCommand(in IncomingCommand cmd)
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}

return Task.CompletedTask;
}
finally
{
Expand Down Expand Up @@ -433,7 +443,7 @@ public QueueDeleteAsyncRpcContinuation(TimeSpan continuationTimeout) : base(cont
{
}

public override void HandleCommand(in IncomingCommand cmd)
public override Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
Expand All @@ -446,6 +456,8 @@ public override void HandleCommand(in IncomingCommand cmd)
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}

return Task.CompletedTask;
}
finally
{
Expand All @@ -460,7 +472,7 @@ public QueuePurgeAsyncRpcContinuation(TimeSpan continuationTimeout) : base(conti
{
}

public override void HandleCommand(in IncomingCommand cmd)
public override Task HandleCommandAsync(IncomingCommand cmd)
{
try
{
Expand All @@ -473,6 +485,8 @@ public override void HandleCommand(in IncomingCommand cmd)
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}

return Task.CompletedTask;
}
finally
{
Expand Down

0 comments on commit 0682db6

Please sign in to comment.