Skip to content

Commit

Permalink
Add test code for issue #1464
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Feb 20, 2024
1 parent bfba294 commit 9bf0e5c
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 23 deletions.
16 changes: 16 additions & 0 deletions projects/Test/Common/IntegrationFixture.cs
Expand Up @@ -551,6 +551,22 @@ protected static byte[] GetRandomBody(ushort size = 1024)
return body;
}

protected static Task WaitForRecoveryAsync(IConnection conn)
{
TaskCompletionSource<bool> tcs = PrepareForRecovery((AutorecoveringConnection)conn);
return WaitAsync(tcs, "recovery succeded");
}

protected static TaskCompletionSource<bool> PrepareForRecovery(IConnection conn)
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

AutorecoveringConnection aconn = conn as AutorecoveringConnection;
aconn.RecoverySucceeded += (source, ea) => tcs.SetResult(true);

return tcs;
}

public static string Now => DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture);
}
}
22 changes: 0 additions & 22 deletions projects/Test/Common/TestConnectionRecoveryBase.cs
Expand Up @@ -231,16 +231,6 @@ protected static TaskCompletionSource<bool> PrepareForShutdown(IConnection conn)
return tcs;
}

protected static TaskCompletionSource<bool> PrepareForRecovery(IConnection conn)
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

AutorecoveringConnection aconn = conn as AutorecoveringConnection;
aconn.RecoverySucceeded += (source, ea) => tcs.SetResult(true);

return tcs;
}

protected static Task<bool> WaitForConfirmsWithCancellationAsync(IChannel m)
{
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4)))
Expand All @@ -249,18 +239,6 @@ protected static Task<bool> WaitForConfirmsWithCancellationAsync(IChannel m)
}
}

protected Task WaitForRecoveryAsync()
{
TaskCompletionSource<bool> tcs = PrepareForRecovery((AutorecoveringConnection)_conn);
return WaitAsync(tcs, "recovery succeded");
}

internal Task WaitForRecoveryAsync(AutorecoveringConnection conn)
{
TaskCompletionSource<bool> tcs = PrepareForRecovery(conn);
return WaitAsync(tcs, "recovery succeeded");
}

protected Task WaitForShutdownAsync()
{
TaskCompletionSource<bool> tcs = PrepareForShutdown(_conn);
Expand Down
60 changes: 60 additions & 0 deletions projects/Test/Integration/TestToxiproxy.cs
Expand Up @@ -169,6 +169,66 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout()
_output.WriteLine($"[INFO] heartbeat timeout took {sw.Elapsed}");
}

[SkippableFact]
[Trait("Category", "Toxiproxy")]
public async Task TestTcpReset_GH1464()
{
Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");

ConnectionFactory cf = CreateConnectionFactory();
cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), ProxyPort);
cf.Port = ProxyPort;
cf.RequestedHeartbeat = TimeSpan.FromSeconds(5);
cf.AutomaticRecoveryEnabled = true;

var channelCreatedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var connectionShutdownTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

Task recoveryTask = Task.Run(async () =>
{
using (IConnection conn = await cf.CreateConnectionAsync())
{
conn.ConnectionShutdown += (o, ea) =>
{
connectionShutdownTcs.SetResult(true);
};
using (IChannel ch = await conn.CreateChannelAsync())
{
channelCreatedTcs.SetResult(true);
await WaitForRecoveryAsync(conn);
await ch.CloseAsync();
}
await conn.CloseAsync();
}
});

Assert.True(await channelCreatedTcs.Task);

const string toxicName = "rmq-localhost-reset_peer";
var resetPeerToxic = new ResetPeerToxic();
resetPeerToxic.Name = toxicName;
resetPeerToxic.Attributes.Timeout = 500;
resetPeerToxic.Toxicity = 1.0;

var sw = new Stopwatch();
sw.Start();

await _rmqProxy.AddAsync(resetPeerToxic);
Task<Proxy> updateProxyTask = _rmqProxy.UpdateAsync();

await Task.WhenAll(updateProxyTask, connectionShutdownTcs.Task);

await _rmqProxy.RemoveToxicAsync(toxicName);

await recoveryTask;

sw.Stop();

_output.WriteLine($"[INFO] reset peer took {sw.Elapsed}");
}

private bool AreToxiproxyTestsEnabled
{
get
Expand Down

0 comments on commit 9bf0e5c

Please sign in to comment.