Skip to content

Commit

Permalink
Remove TODO
Browse files Browse the repository at this point in the history
Add assertion

Remove assertion, add comment

* Add `ConfirmsAreEnabled` in `ChannelBase`

* Add `CancellationToken` to `CreateChannelAsync`

* Use `CancellationToken` argument in `OpenAsync`
  • Loading branch information
lukebakken committed Mar 13, 2024
1 parent f9dd5c6 commit 0866269
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 47 deletions.
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Expand Up @@ -919,7 +919,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~RabbitMQ.Client.IChannel.TxRollbackAsync() -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.TxSelectAsync() -> System.Threading.Tasks.Task
~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IConnection.CreateChannelAsync() -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel>
~RabbitMQ.Client.IConnection.CreateChannelAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel>
~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint> endpoints, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
Expand Down
9 changes: 5 additions & 4 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Expand Up @@ -225,13 +225,14 @@ public interface IConnection : INetworkConnection, IDisposable
/// <param name="reasonText">A message indicating the reason for closing the connection.</param>
/// <param name="timeout"></param>
/// <param name="abort">Whether or not this close is an abort (ignores certain exceptions).</param>
/// <param name="cancellationToken"></param>
Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, CancellationToken cancellationToken = default);
/// <param name="cancellationToken">Cancellation token</param>
Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort,
CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously create and return a fresh channel, session, and channel.
/// </summary>
// TODO cancellation token
Task<IChannel> CreateChannelAsync();
/// <param name="cancellationToken">Cancellation token</param>
Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default);
}
}
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Expand Up @@ -146,7 +146,7 @@ public IBasicConsumer DefaultConsumer
public string CurrentQueue => InnerChannel.CurrentQueue;

internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers,
bool recordedEntitiesSemaphoreHeld = false)
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (false == recordedEntitiesSemaphoreHeld)
{
Expand All @@ -156,7 +156,7 @@ public IBasicConsumer DefaultConsumer
ThrowIfDisposed();
_connection = conn;

RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync()
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(cancellationToken)
.ConfigureAwait(false);
newChannel.TakeOver(_innerChannel);

Expand Down
Expand Up @@ -731,15 +731,15 @@ private bool AnyConsumersOnQueue(string queue)
}

private async Task RecordChannelAsync(AutorecoveringChannel channel,
bool channelsSemaphoreHeld = false)
bool channelsSemaphoreHeld, CancellationToken cancellationToken)
{
if (channelsSemaphoreHeld)
{
DoAddRecordedChannel(channel);
}
else
{
await _channelsSemaphore.WaitAsync()
await _channelsSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
Expand All @@ -757,6 +757,7 @@ private void DoAddRecordedChannel(AutorecoveringChannel channel)
_channels.Add(channel);
}

// TODO remove this unused method
internal void DeleteRecordedChannel(in AutorecoveringChannel channel,
bool channelsSemaphoreHeld, bool recordedEntitiesSemaphoreHeld)
{
Expand Down
Expand Up @@ -184,7 +184,7 @@ await RecoverBindingsAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true
.ConfigureAwait(false);

}
await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true)
await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true, cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
finally
Expand Down Expand Up @@ -541,7 +541,7 @@ void UpdateConsumer(string oldTag, string newTag, in RecordedConsumer consumer)
}
}

private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitiesSemaphoreHeld = false)
private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (false == recordedEntitiesSemaphoreHeld)
{
Expand All @@ -551,7 +551,8 @@ private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitie
foreach (AutorecoveringChannel channel in _channels)
{
await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled,
recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld)
recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
}
Expand Down
10 changes: 5 additions & 5 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
Expand Up @@ -175,11 +175,11 @@ private void CreateInnerConnection(IFrameHandler frameHandler)

public IProtocol Protocol => Endpoint.Protocol;

public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync()
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(CancellationToken cancellationToken)
{
ISession session = InnerConnection.CreateSession();
var result = new RecoveryAwareChannel(_config, session);
return await result.OpenAsync()
return await result.OpenAsync(cancellationToken)
.ConfigureAwait(false) as RecoveryAwareChannel;
}

Expand Down Expand Up @@ -241,13 +241,13 @@ await CloseInnerConnectionAsync()
}
}

public async Task<IChannel> CreateChannelAsync()
public async Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
{
EnsureIsOpen();
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync()
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cancellationToken)
.ConfigureAwait(false);
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel);
await RecordChannelAsync(channel, channelsSemaphoreHeld: false)
await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
return channel;
}
Expand Down
58 changes: 34 additions & 24 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Expand Up @@ -59,7 +59,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);

private readonly object _confirmLock = new object();
private object _confirmLock;
private readonly LinkedList<ulong> _pendingDeliveryTags = new LinkedList<ulong>();

private bool _onlyAcksReceived = true;
Expand Down Expand Up @@ -183,7 +183,6 @@ public IBasicConsumer DefaultConsumer

public bool IsOpen => CloseReason is null;

// TODO add private bool for Confirm mode
public ulong NextPublishSeqNo { get; private set; }

public string CurrentQueue { get; private set; }
Expand Down Expand Up @@ -376,19 +375,22 @@ protected bool Enqueue(IRpcContinuation k)
}
}

internal async Task<IChannel> OpenAsync()
internal async Task<IChannel> OpenAsync(CancellationToken cancellationToken)
{
bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout);

await _rpcSemaphore.WaitAsync(k.CancellationToken)
using CancellationTokenSource lts =
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, k.CancellationToken);

await _rpcSemaphore.WaitAsync(lts.Token)
.ConfigureAwait(false);
try
{
enqueued = Enqueue(k);

var method = new ChannelOpen();
await ModelSendAsync(method, k.CancellationToken)
await ModelSendAsync(method, lts.Token)
.ConfigureAwait(false);

bool result = await k;
Expand Down Expand Up @@ -416,6 +418,8 @@ internal void FinishClose()
m_connectionStartCell?.TrySetResult(null);
}

private bool ConfirmsAreEnabled => _confirmLock != null;

private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
/*
Expand Down Expand Up @@ -475,17 +479,21 @@ private void OnChannelShutdown(ShutdownEventArgs reason)
{
_continuationQueue.HandleChannelShutdown(reason);
_channelShutdownWrapper.Invoke(this, reason);
lock (_confirmLock)

if (ConfirmsAreEnabled)
{
if (_confirmsTaskCompletionSources?.Count > 0)
lock (_confirmLock)
{
var exception = new AlreadyClosedException(reason);
foreach (var confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
if (_confirmsTaskCompletionSources?.Count > 0)
{
confirmsTaskCompletionSource.TrySetException(exception);
}
var exception = new AlreadyClosedException(reason);
foreach (var confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
{
confirmsTaskCompletionSource.TrySetException(exception);
}

_confirmsTaskCompletionSources.Clear();
_confirmsTaskCompletionSources.Clear();
}
}
}

Expand Down Expand Up @@ -581,7 +589,7 @@ protected void HandleBasicNack(in IncomingCommand cmd)
protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
{
// No need to do this if publisher confirms have never been enabled.
if (NextPublishSeqNo > 0)
if (ConfirmsAreEnabled)
{
// let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted
lock (_confirmLock)
Expand Down Expand Up @@ -1017,7 +1025,7 @@ await ModelSendAsync(method, k.CancellationToken)
public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
if (NextPublishSeqNo > 0)
if (ConfirmsAreEnabled)
{
lock (_confirmLock)
{
Expand Down Expand Up @@ -1047,7 +1055,7 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
}
catch
{
if (NextPublishSeqNo > 0)
if (ConfirmsAreEnabled)
{
lock (_confirmLock)
{
Expand Down Expand Up @@ -1078,7 +1086,7 @@ private static void InjectTraceContextIntoBasicProperties(object propsObj, strin
TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
if (NextPublishSeqNo > 0)
if (ConfirmsAreEnabled)
{
lock (_confirmLock)
{
Expand Down Expand Up @@ -1109,7 +1117,7 @@ private static void InjectTraceContextIntoBasicProperties(object propsObj, strin
}
catch
{
if (NextPublishSeqNo > 0)
if (ConfirmsAreEnabled)
{
lock (_confirmLock)
{
Expand All @@ -1126,7 +1134,7 @@ private static void InjectTraceContextIntoBasicProperties(object propsObj, strin
TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
if (NextPublishSeqNo > 0)
if (ConfirmsAreEnabled)
{
lock (_confirmLock)
{
Expand Down Expand Up @@ -1157,7 +1165,7 @@ private static void InjectTraceContextIntoBasicProperties(object propsObj, strin
}
catch
{
if (NextPublishSeqNo > 0)
if (ConfirmsAreEnabled)
{
lock (_confirmLock)
{
Expand Down Expand Up @@ -1263,6 +1271,10 @@ await ModelSendAsync(method, k.CancellationToken)
bool result = await k;
Debug.Assert(result);

// Note:
// Non-null means confirms are enabled
_confirmLock = new object();

return;
}
finally
Expand Down Expand Up @@ -1747,7 +1759,7 @@ await ModelSendAsync(method, k.CancellationToken)

public Task<bool> WaitForConfirmsAsync(CancellationToken token = default)
{
if (NextPublishSeqNo == 0UL)
if (false == ConfirmsAreEnabled)
{
throw new InvalidOperationException("Confirms not selected");
}
Expand Down Expand Up @@ -1821,17 +1833,15 @@ public async Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
await CloseAsync(ea, false)
.ConfigureAwait(false);
}
catch (TaskCanceledException)
catch (OperationCanceledException ex)
{
const string msg = "timed out waiting for acks";

var ex = new IOException(msg);
var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.ReplySuccess, msg, ex);

await CloseAsync(ea, false)
.ConfigureAwait(false);

throw ex;
throw;
}
}

Expand Down
7 changes: 4 additions & 3 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Expand Up @@ -256,12 +256,12 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken)
}
}

public Task<IChannel> CreateChannelAsync()
public Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
{
EnsureIsOpen();
ISession session = CreateSession();
var channel = new Channel(_config, session);
return channel.OpenAsync();
return channel.OpenAsync(cancellationToken);
}

internal ISession CreateSession()
Expand All @@ -285,7 +285,8 @@ internal void EnsureIsOpen()
}

///<summary>Asynchronous API-side invocation of connection.close with timeout.</summary>
public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, CancellationToken cancellationToken)
public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort,
CancellationToken cancellationToken = default)
{
var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText);
return CloseAsync(reason, abort, timeout, cancellationToken);
Expand Down
10 changes: 8 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Expand Up @@ -313,8 +313,14 @@ internal static bool TryReadFrame(ref ReadOnlySequence<byte> buffer, uint maxMes
return false;
}

// TODO check this?
// buffer.IsSingleSegment;
/*
* Note:
* The use of buffer.Slice seems to take all segments into account, thus there appears to be no need to check IsSingleSegment
* Debug.Assert(buffer.IsSingleSegment);
* In addition, the TestBasicRoundtripConcurrentManyMessages asserts that the consumed message bodies are equivalent to
* the published bodies, and if there were an issue parsing frames, it would show up there for sure.
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1516#issuecomment-1991943017
*/

byte firstByte = buffer.First.Span[0];
if (firstByte == 'A')
Expand Down
1 change: 0 additions & 1 deletion projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Expand Up @@ -428,7 +428,6 @@ await tcpClient.ConnectAsync(endpoint.Address, endpoint.Port, linkedTokenSource.
{
if (timeoutTokenSource.Token.IsCancellationRequested)
{
// TODO maybe do not use System.TimeoutException here
var timeoutException = new TimeoutException(msg, e);
throw new ConnectFailureException(msg, timeoutException);
}
Expand Down

0 comments on commit 0866269

Please sign in to comment.