From 30ae20a8ae851baa62732e50baf357fa488c5947 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 27 Mar 2024 10:31:20 -0700 Subject: [PATCH] Fix `econnaborted` seen in integration test runs This error was introduced with commit 1fa0562 The main frame reading loop was cancelled too soon. --- .../RabbitMQ.Client/client/impl/Connection.Commands.cs | 4 ++-- .../RabbitMQ.Client/client/impl/Connection.Heartbeat.cs | 9 +++++++-- .../RabbitMQ.Client/client/impl/Connection.Receive.cs | 7 +++++-- projects/RabbitMQ.Client/client/impl/Connection.cs | 4 ++-- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs index a80b11806..4a2f0ce36 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs @@ -50,7 +50,7 @@ public Task UpdateSecretAsync(string newSecret, string reason) internal void NotifyReceivedCloseOk() { - TerminateMainloop(); + MaybeTerminateMainloopAndStopHeartbeatTimers(cancelMainLoop: true); _closed = true; } @@ -112,7 +112,7 @@ await _frameHandler.SendProtocolHeaderAsync(cancellationToken) var serverVersion = new AmqpVersion(connectionStart.m_versionMajor, connectionStart.m_versionMinor); if (!serverVersion.Equals(Protocol.Version)) { - TerminateMainloop(); + MaybeTerminateMainloopAndStopHeartbeatTimers(); await FinishCloseAsync(cancellationToken); throw new ProtocolVersionMismatchException(Protocol.MajorVersion, Protocol.MinorVersion, serverVersion.Major, serverVersion.Minor); } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs index 2f6750fe1..97b3b900d 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs @@ -116,8 +116,13 @@ private async void HeartbeatReadTimerCallback(object? state) if (shouldTerminate) { - TerminateMainloop(); - await FinishCloseAsync(_mainLoopCts.Token) + MaybeTerminateMainloopAndStopHeartbeatTimers(); + /* + * Note: do NOT use the main loop cancellation token, + * because FininshCloseAsync immediately cancels it + */ + using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionAbortTimeout); + await FinishCloseAsync(cts.Token) .ConfigureAwait(false); } else diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs index bfd907698..8828416af 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs @@ -165,9 +165,12 @@ private async Task ProcessFrameAsync(InboundFrame frame, CancellationToken cance /// /// May be called more than once. Should therefore be idempotent. /// - private void TerminateMainloop() + private void MaybeTerminateMainloopAndStopHeartbeatTimers(bool cancelMainLoop = false) { - _mainLoopCts.Cancel(); + if (cancelMainLoop) + { + _mainLoopCts.Cancel(); + } MaybeStopHeartbeatTimers(); } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 6c93d895d..d0d5bf75b 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -362,7 +362,7 @@ await _session0.TransmitAsync(method, cancellationToken) } finally { - TerminateMainloop(); + MaybeTerminateMainloopAndStopHeartbeatTimers(); } } @@ -402,7 +402,7 @@ internal void ClosedViaPeer(ShutdownEventArgs reason) OnShutdown(reason); _session0.SetSessionClosing(true); - TerminateMainloop(); + MaybeTerminateMainloopAndStopHeartbeatTimers(cancelMainLoop: true); } // Only call at the end of the Mainloop or HeartbeatLoop