diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs index a80b11806..4a2f0ce36 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs @@ -50,7 +50,7 @@ public Task UpdateSecretAsync(string newSecret, string reason) internal void NotifyReceivedCloseOk() { - TerminateMainloop(); + MaybeTerminateMainloopAndStopHeartbeatTimers(cancelMainLoop: true); _closed = true; } @@ -112,7 +112,7 @@ await _frameHandler.SendProtocolHeaderAsync(cancellationToken) var serverVersion = new AmqpVersion(connectionStart.m_versionMajor, connectionStart.m_versionMinor); if (!serverVersion.Equals(Protocol.Version)) { - TerminateMainloop(); + MaybeTerminateMainloopAndStopHeartbeatTimers(); await FinishCloseAsync(cancellationToken); throw new ProtocolVersionMismatchException(Protocol.MajorVersion, Protocol.MinorVersion, serverVersion.Major, serverVersion.Minor); } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs index 2f6750fe1..97b3b900d 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs @@ -116,8 +116,13 @@ private async void HeartbeatReadTimerCallback(object? state) if (shouldTerminate) { - TerminateMainloop(); - await FinishCloseAsync(_mainLoopCts.Token) + MaybeTerminateMainloopAndStopHeartbeatTimers(); + /* + * Note: do NOT use the main loop cancellation token, + * because FininshCloseAsync immediately cancels it + */ + using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout); + await FinishCloseAsync(cts.Token) .ConfigureAwait(false); } else diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs index bfd907698..8828416af 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs @@ -165,9 +165,12 @@ private async Task ProcessFrameAsync(InboundFrame frame, CancellationToken cance /// /// May be called more than once. Should therefore be idempotent. /// - private void TerminateMainloop() + private void MaybeTerminateMainloopAndStopHeartbeatTimers(bool cancelMainLoop = false) { - _mainLoopCts.Cancel(); + if (cancelMainLoop) + { + _mainLoopCts.Cancel(); + } MaybeStopHeartbeatTimers(); } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 6c93d895d..d0d5bf75b 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -362,7 +362,7 @@ await _session0.TransmitAsync(method, cancellationToken) } finally { - TerminateMainloop(); + MaybeTerminateMainloopAndStopHeartbeatTimers(); } } @@ -402,7 +402,7 @@ internal void ClosedViaPeer(ShutdownEventArgs reason) OnShutdown(reason); _session0.SetSessionClosing(true); - TerminateMainloop(); + MaybeTerminateMainloopAndStopHeartbeatTimers(cancelMainLoop: true); } // Only call at the end of the Mainloop or HeartbeatLoop