Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for #1520: Add support for retry on transactions for resharding #2062

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/StackExchange.Redis/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,51 @@ public partial class RedisException : Exception
protected RedisException(SerializationInfo info, StreamingContext ctx) : base(info, ctx) { }
}

/// <summary>
/// Indicates an exception raised by a redis server due to a hashslot being migrated, but command had a flag set to not
/// allow redirects
/// </summary>
[Serializable]
public sealed partial class RedisHashslotMigratedAndNoRedirectException : RedisException
{
/// <summary>
/// The hashslot that was migrated
/// </summary>
public int HashSlot
{
get;
}

/// <summary>
/// The new endpoint that hashslot is located on
/// </summary>
public string Endpoint
{
get;
}

/// <summary>
/// Creates a new <see cref="RedisHashslotMigratedAndNoRedirectException"/>.
/// </summary>
/// <param name="message">The message for the exception.</param>
/// <param name="hashSlot">The hashslot that was migrated.</param>
/// <param name="endpoint">The endpoint where the hashslot is now located</param>
public RedisHashslotMigratedAndNoRedirectException(string message, int hashSlot, string endpoint) : base(message)
{
HashSlot = hashSlot;
Endpoint = endpoint;
}

/// <summary>
/// Returns the error message in the format MOVED [HASH_SLOT] [NEW ENDPOINT]
/// </summary>
/// <returns></returns>
public string GetMovedErrorMessage()
{
return CommonReplies.MOVED.ToString() + HashSlot.ToString() + " " + Endpoint;
}
}

/// <summary>
/// Indicates an exception raised by a redis server.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/StackExchange.Redis/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ public virtual void AppendStormLog(StringBuilder sb)

bool ICompletable.TryComplete(bool isAsync) { Complete(); return true; }

public void Complete()
public virtual void Complete()
{
//Ensure we can never call Complete on the same resultBox from two threads by grabbing it now
var currBox = Interlocked.Exchange(ref resultBox, null);
Expand Down
6 changes: 5 additions & 1 deletion src/StackExchange.Redis/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@

StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException
StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException.Endpoint.get -> string
StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException.GetMovedErrorMessage() -> string
StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException.HashSlot.get -> int
StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException.RedisHashslotMigratedAndNoRedirectException(string message, int hashSlot, string endpoint) -> void
133 changes: 130 additions & 3 deletions src/StackExchange.Redis/RedisTransaction.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -56,6 +58,12 @@ public Task<bool> ExecuteAsync(CommandFlags flags)
return base.ExecuteAsync(msg, proc); // need base to avoid our local wrapping override
}

internal bool ExecuteInternal(CommandFlags flags, ServerEndPoint endpoint = null)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge fan of this, but I also didn't want to change the public API of this for the sake of making a functional test work.

{
var msg = CreateMessage(flags, out ResultProcessor<bool> proc);
return base.ExecuteSync(msg, proc, endpoint); // need base to avoid our local "not supported" override
}

internal override Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> processor, ServerEndPoint server = null)
{
if (message == null) return CompletedTask<T>.Default(asyncState);
Expand Down Expand Up @@ -146,6 +154,33 @@ public QueuedMessage(Message message) : base(message.Db, message.Flags | Command
Wrapped = message;
}

// for transactions, the inner operations should only be marked completed when the the final EXEC has been processed
/// <summary>
/// For queued messages (InnerOperations) in a the transaction, we cannot actually mark it complete until the exec
/// function has returned with the state of the transaction.
///
/// Calling the base complete resets the ResultBox,
/// </summary>
public override void Complete()
{
// still need to activate continuations for GetMessages(),
// which might be waiting for the last innerOperation to
// complete.
ResultBox?.ActivateContinuations();
Copy link
Author

@dusty9023 dusty9023 Apr 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood the logic behind this correct, in order for the last section of the GetMessage() logic to work, where it waits on the last result box of the innerOperations, this needs to be fired.

This ultimately means that in the lifetime of a QueuedMessage, it will fire/pulse twice. Once, when it's marked Complete() after it finishes it's call to REDIS, and then again when TransactionComplete() is called when the TransactionMessage runs it's Complete(). (which calls TransactionComplete() on all its innerOperations -- calling base.Complete()).

}

/// <summary>
/// This is called when the transaction has been complete (Exec), and marks the operations as complete, updates performance,
/// and clears the ResultBox.
///
/// It also triggers the ActivateContinuations() for a second time.
///
/// </summary>
public void TransactionComplete()
{
base.Complete();
}

public bool WasQueued
{
get => wasQueued;
Expand Down Expand Up @@ -428,6 +463,21 @@ private bool AreAllConditionsSatisfied(ConnectionMultiplexer multiplexer)
}
return result;
}

/// <summary>
/// When the transaction completed, the innerOperations needs to be informed that the transaction has been completed.
/// </summary>
public override void Complete()
{
// let all the inneroperations know that the transaction is complete
foreach (var msg in InnerOperations)
{
msg.TransactionComplete();
}

// continue marking this message as complete.
base.Complete();
}
}

private class TransactionProcessor : ResultProcessor<bool>
Expand All @@ -439,13 +489,90 @@ public override bool SetResult(PhysicalConnection connection, Message message, i
if (result.IsError && message is TransactionMessage tran)
{
string error = result.GetString();
bool isMoved = true;
var migratedErrorMessage = new HashSet<string>();
RedisHashslotMigratedAndNoRedirectException migratedException;

foreach (var op in tran.InnerOperations)
{
var inner = op.Wrapped;
ServerFail(inner, error);
inner.Complete();
var opResultBox = op.ResultBox;
// check if this internal operation errored out
if (opResultBox.IsFaulted)
{
// if this resultbox is one that allows us access to the error
if (opResultBox is IResultBox<bool>)
{
// get the error of the inner operation
var simpleOpResultBox = opResultBox as IResultBox<bool>;
Exception exception;
simpleOpResultBox.GetResult(out exception);

// append the inneroperation error to the transaction error
error += "\n\n" + op.Command.ToString() + ": " + exception?.Message;

// if the error is related to a hashslot being migrated, then add the error to a set.
// if ALL the errors are related to this hashslot being moved, then it's possibly to retry
// the transaction on the new endpoint
if (exception is RedisHashslotMigratedAndNoRedirectException)
{
migratedException = exception as RedisHashslotMigratedAndNoRedirectException;
migratedErrorMessage.Add(migratedException.GetMovedErrorMessage());
} else
{
isMoved = false;
}

} else
{
error += "\n\n" + op.Command.ToString() + ": Undeterminted Error";
// have to assume it's false
isMoved = false;
}
}
}

// all failed due to a hashslot move
if (isMoved && migratedErrorMessage.Count > 0)
{
// there should be a SINGLE MOVED error in the set (same endpoint and hashslot)
if (migratedErrorMessage.Count == 1)
{
// prepend the "MOVED" error to the start of the error, so the ResultProcessor
// is able to detect it, and retry the transaction
error = migratedErrorMessage.First() + " " + error;
foreach (var op in tran.InnerOperations)
{
// reset the state of the internal operations
var wasQueued = SimpleResultBox<bool>.Create();
op.SetSource(wasQueued, QueuedProcessor.Default);
}
}
// the transaction must have utilized multiple hashslots, with multiple ones that moved
else
{
isMoved = false;
error = "Multiple hashslots and/or endpoints detected as MOVED in a single transaction \n\n" + error;
}
}

// if this is not a recoverable MOVED error,
if(!isMoved)
{
// then mark all the inneroperation's wrapped operations as failed, and complete
foreach (var op in tran.InnerOperations)
{
var inner = op.Wrapped;
ServerFail(inner, error);
inner.Complete();
}
}

// take our updated error message, and pass it to the base ResultProcessor.
var newResult = new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes(error));
return base.SetResult(connection, message, new RawResult(ResultType.Error, newResult, false));
}

// allow the base processor to process to result of the transaction
return base.SetResult(connection, message, result);
}

Expand Down
20 changes: 17 additions & 3 deletions src/StackExchange.Redis/ResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ public static void ConnectionFail(Message message, ConnectionFailureType fail, s
SetException(message, new RedisConnectionException(fail, errorMessage));
}

public static void HashSlotMigratedAndNoRedirectAllowedFail(Message message, int hashSlot, string endpoint, string errorMessage)
{
SetException(message, new RedisHashslotMigratedAndNoRedirectException(errorMessage, hashSlot, endpoint));
}

public static void ServerFail(Message message, string errorMessage)
{
SetException(message, new RedisServerException(errorMessage));
Expand Down Expand Up @@ -207,14 +212,17 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
bool wasNoRedirect = (message.Flags & CommandFlags.NoRedirect) != 0;
string err = string.Empty;
bool unableToConnectError = false;
bool hashslotMigratedAndNoRedirectError = false;
int hashSlot = -1;
EndPoint endpoint = null;
if (isMoved || result.StartsWith(CommonReplies.ASK))
{
message.SetResponseReceived();

log = false;
string[] parts = result.GetString().Split(StringSplits.Space, 3);
EndPoint endpoint;
if (Format.TryParseInt32(parts[1], out int hashSlot)
string[] parts = result.GetString().Split(StringSplits.Space, 4);

if (Format.TryParseInt32(parts[1], out hashSlot)
&& (endpoint = Format.TryParseEndPoint(parts[2])) != null)
{
// no point sending back to same server, and no point sending to a dead server
Expand All @@ -231,6 +239,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
{
if (isMoved && wasNoRedirect)
{
hashslotMigratedAndNoRedirectError = true;
err = $"Key has MOVED to Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed for {message.CommandAndKey}. ";
}
else
Expand Down Expand Up @@ -258,6 +267,11 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
{
ConnectionFail(message, ConnectionFailureType.UnableToConnect, err);
}
// if this message has an error due to the hashslot being moved, and the command has been marked as NO REDIRECT.
else if(hashslotMigratedAndNoRedirectError && endpoint != null)
{
HashSlotMigratedAndNoRedirectAllowedFail(message, hashSlot, Format.ToString(endpoint), err);
}
else
{
ServerFail(message, err);
Expand Down
66 changes: 64 additions & 2 deletions tests/StackExchange.Redis.Tests/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ static string StringGet(IServer server, RedisKey key, CommandFlags flags = Comma
string b = StringGet(conn.GetServer(node.EndPoint), key);
Assert.Equal(value, b); // wrong primary, allow redirect

var ex = Assert.Throws<RedisServerException>(() => StringGet(conn.GetServer(node.EndPoint), key, CommandFlags.NoRedirect));
var ex = Assert.Throws<RedisHashslotMigratedAndNoRedirectException>(() => StringGet(conn.GetServer(node.EndPoint), key, CommandFlags.NoRedirect));
Assert.StartsWith($"Key has MOVED to Endpoint {rightPrimaryNode.EndPoint} and hashslot {slot}", ex.Message);
}

Expand All @@ -188,7 +188,69 @@ static string StringGet(IServer server, RedisKey key, CommandFlags flags = Comma
string e = StringGet(conn.GetServer(node.EndPoint), key);
Assert.Equal(value, e); // wrong replica, allow redirect

var ex = Assert.Throws<RedisServerException>(() => StringGet(conn.GetServer(node.EndPoint), key, CommandFlags.NoRedirect));
var ex = Assert.Throws<RedisHashslotMigratedAndNoRedirectException>(() => StringGet(conn.GetServer(node.EndPoint), key, CommandFlags.NoRedirect));
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how much of an impact switching from RedisServerException to RedisHashslotMigratedAndNoRedirectException will have; it's a pretty specific criteria and error to cause this.

One strategy that could be used to mitigate the impact would be to switch RedisHashslotMigratedAndNoRedirectException be a subclass of RedisServerException (requires unsealing the class).

Assert.StartsWith($"Key has MOVED to Endpoint {rightPrimaryNode.EndPoint} and hashslot {slot}", ex.Message);
}
}
}

[Fact]
public void IntentionalWrongServerForTransaction()
{
static string[] TransactionalReplace(IServer server, RedisKey key, RedisValue newRedisValue, CommandFlags flags = CommandFlags.None)
{
var database = server.Multiplexer.GetDatabase();
var transaction = database.CreateTransaction();
var serverEndpoint = new ServerEndPoint((ConnectionMultiplexer) server.Multiplexer, server.EndPoint);

Task<RedisValue> originalVal = transaction.StringGetAsync(key, flags);
Task<bool> writeVal = transaction.StringSetAsync(key, newRedisValue, null, false, When.Always, flags);
Task<RedisValue> newVal = transaction.StringGetAsync(key, flags);

var result = ((RedisTransaction)transaction).ExecuteInternal(flags, serverEndpoint);
Assert.True(result);
Assert.True(writeVal.Result);

return new string[] {
originalVal.Result, newVal.Result
};
}

using (var conn = Create())
{
var endpoints = conn.GetEndPoints();
var servers = endpoints.Select(e => conn.GetServer(e)).ToList();

var key = Me();
const string value = "abc";
const string newValue = "def";
var db = conn.GetDatabase();
db.KeyDelete(key, CommandFlags.FireAndForget);
db.StringSet(key, value, flags: CommandFlags.FireAndForget);
servers[0].Ping();
var config = servers[0].ClusterConfiguration;
Assert.NotNull(config);
int slot = conn.HashSlot(key);
var rightPrimaryNode = config.GetBySlot(key);
Assert.NotNull(rightPrimaryNode);
Log("Right Primary: {0} {1}", rightPrimaryNode.EndPoint, rightPrimaryNode.NodeId);

string[] responses = TransactionalReplace(conn.GetServer(rightPrimaryNode.EndPoint), key, newValue);
Assert.Equal(value, responses[0]); // right primary
Assert.Equal(newValue, responses[1]);

db.KeyDelete(key, CommandFlags.FireAndForget);
db.StringSet(key, value, flags: CommandFlags.FireAndForget);

var node = config.Nodes.FirstOrDefault(x => !x.IsReplica && x.NodeId != rightPrimaryNode.NodeId);
Assert.NotNull(node);
Log("Using Primary: {0}", node.EndPoint, node.NodeId);
{
string[] otherResponses = TransactionalReplace(conn.GetServer(node.EndPoint), key, newValue);
Assert.Equal(value, otherResponses[0]); // right primary
Assert.Equal(newValue, otherResponses[1]);

var ex = Assert.Throws<RedisHashslotMigratedAndNoRedirectException>(() => TransactionalReplace(conn.GetServer(node.EndPoint), key, newValue, CommandFlags.NoRedirect));
Assert.StartsWith($"Key has MOVED to Endpoint {rightPrimaryNode.EndPoint} and hashslot {slot}", ex.Message);
}
}
Expand Down