-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Fix for #1520: Add support for retry on transactions for resharding #2062
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9d35473
e9525e0
f5f8190
b76f6bf
23d9972
387490e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,5 @@ | ||
| ||
StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException | ||
StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException.Endpoint.get -> string | ||
StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException.GetMovedErrorMessage() -> string | ||
StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException.HashSlot.get -> int | ||
StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException.RedisHashslotMigratedAndNoRedirectException(string message, int hashSlot, string endpoint) -> void |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
using System; | ||
using System.Buffers; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Text; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
@@ -56,6 +58,12 @@ public Task<bool> ExecuteAsync(CommandFlags flags) | |
return base.ExecuteAsync(msg, proc); // need base to avoid our local wrapping override | ||
} | ||
|
||
internal bool ExecuteInternal(CommandFlags flags, ServerEndPoint endpoint = null) | ||
{ | ||
var msg = CreateMessage(flags, out ResultProcessor<bool> proc); | ||
return base.ExecuteSync(msg, proc, endpoint); // need base to avoid our local "not supported" override | ||
} | ||
|
||
internal override Task<T> ExecuteAsync<T>(Message message, ResultProcessor<T> processor, ServerEndPoint server = null) | ||
{ | ||
if (message == null) return CompletedTask<T>.Default(asyncState); | ||
|
@@ -146,6 +154,33 @@ public QueuedMessage(Message message) : base(message.Db, message.Flags | Command | |
Wrapped = message; | ||
} | ||
|
||
// for transactions, the inner operations should only be marked completed when the the final EXEC has been processed | ||
/// <summary> | ||
/// For queued messages (InnerOperations) in a the transaction, we cannot actually mark it complete until the exec | ||
/// function has returned with the state of the transaction. | ||
/// | ||
/// Calling the base complete resets the ResultBox, | ||
/// </summary> | ||
public override void Complete() | ||
{ | ||
// still need to activate continuations for GetMessages(), | ||
// which might be waiting for the last innerOperation to | ||
// complete. | ||
ResultBox?.ActivateContinuations(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understood the logic behind this correct, in order for the last section of the GetMessage() logic to work, where it waits on the last result box of the innerOperations, this needs to be fired. This ultimately means that in the lifetime of a QueuedMessage, it will fire/pulse twice. Once, when it's marked Complete() after it finishes it's call to REDIS, and then again when TransactionComplete() is called when the TransactionMessage runs it's Complete(). (which calls TransactionComplete() on all its innerOperations -- calling base.Complete()). |
||
} | ||
|
||
/// <summary> | ||
/// This is called when the transaction has been complete (Exec), and marks the operations as complete, updates performance, | ||
/// and clears the ResultBox. | ||
/// | ||
/// It also triggers the ActivateContinuations() for a second time. | ||
/// | ||
/// </summary> | ||
public void TransactionComplete() | ||
{ | ||
base.Complete(); | ||
} | ||
|
||
public bool WasQueued | ||
{ | ||
get => wasQueued; | ||
|
@@ -428,6 +463,21 @@ private bool AreAllConditionsSatisfied(ConnectionMultiplexer multiplexer) | |
} | ||
return result; | ||
} | ||
|
||
/// <summary> | ||
/// When the transaction completed, the innerOperations needs to be informed that the transaction has been completed. | ||
/// </summary> | ||
public override void Complete() | ||
{ | ||
// let all the inneroperations know that the transaction is complete | ||
foreach (var msg in InnerOperations) | ||
{ | ||
msg.TransactionComplete(); | ||
} | ||
|
||
// continue marking this message as complete. | ||
base.Complete(); | ||
} | ||
} | ||
|
||
private class TransactionProcessor : ResultProcessor<bool> | ||
|
@@ -439,13 +489,90 @@ public override bool SetResult(PhysicalConnection connection, Message message, i | |
if (result.IsError && message is TransactionMessage tran) | ||
{ | ||
string error = result.GetString(); | ||
bool isMoved = true; | ||
var migratedErrorMessage = new HashSet<string>(); | ||
RedisHashslotMigratedAndNoRedirectException migratedException; | ||
|
||
foreach (var op in tran.InnerOperations) | ||
{ | ||
var inner = op.Wrapped; | ||
ServerFail(inner, error); | ||
inner.Complete(); | ||
var opResultBox = op.ResultBox; | ||
// check if this internal operation errored out | ||
if (opResultBox.IsFaulted) | ||
{ | ||
// if this resultbox is one that allows us access to the error | ||
if (opResultBox is IResultBox<bool>) | ||
{ | ||
// get the error of the inner operation | ||
var simpleOpResultBox = opResultBox as IResultBox<bool>; | ||
Exception exception; | ||
simpleOpResultBox.GetResult(out exception); | ||
|
||
// append the inneroperation error to the transaction error | ||
error += "\n\n" + op.Command.ToString() + ": " + exception?.Message; | ||
|
||
// if the error is related to a hashslot being migrated, then add the error to a set. | ||
// if ALL the errors are related to this hashslot being moved, then it's possibly to retry | ||
// the transaction on the new endpoint | ||
if (exception is RedisHashslotMigratedAndNoRedirectException) | ||
{ | ||
migratedException = exception as RedisHashslotMigratedAndNoRedirectException; | ||
migratedErrorMessage.Add(migratedException.GetMovedErrorMessage()); | ||
} else | ||
{ | ||
isMoved = false; | ||
} | ||
|
||
} else | ||
{ | ||
error += "\n\n" + op.Command.ToString() + ": Undeterminted Error"; | ||
// have to assume it's false | ||
isMoved = false; | ||
} | ||
} | ||
} | ||
|
||
// all failed due to a hashslot move | ||
if (isMoved && migratedErrorMessage.Count > 0) | ||
{ | ||
// there should be a SINGLE MOVED error in the set (same endpoint and hashslot) | ||
if (migratedErrorMessage.Count == 1) | ||
{ | ||
// prepend the "MOVED" error to the start of the error, so the ResultProcessor | ||
// is able to detect it, and retry the transaction | ||
error = migratedErrorMessage.First() + " " + error; | ||
foreach (var op in tran.InnerOperations) | ||
{ | ||
// reset the state of the internal operations | ||
var wasQueued = SimpleResultBox<bool>.Create(); | ||
op.SetSource(wasQueued, QueuedProcessor.Default); | ||
} | ||
} | ||
// the transaction must have utilized multiple hashslots, with multiple ones that moved | ||
else | ||
{ | ||
isMoved = false; | ||
error = "Multiple hashslots and/or endpoints detected as MOVED in a single transaction \n\n" + error; | ||
} | ||
} | ||
|
||
// if this is not a recoverable MOVED error, | ||
if(!isMoved) | ||
{ | ||
// then mark all the inneroperation's wrapped operations as failed, and complete | ||
foreach (var op in tran.InnerOperations) | ||
{ | ||
var inner = op.Wrapped; | ||
ServerFail(inner, error); | ||
inner.Complete(); | ||
} | ||
} | ||
|
||
// take our updated error message, and pass it to the base ResultProcessor. | ||
var newResult = new ReadOnlySequence<byte>(Encoding.UTF8.GetBytes(error)); | ||
return base.SetResult(connection, message, new RawResult(ResultType.Error, newResult, false)); | ||
} | ||
|
||
// allow the base processor to process to result of the transaction | ||
return base.SetResult(connection, message, result); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -171,7 +171,7 @@ static string StringGet(IServer server, RedisKey key, CommandFlags flags = Comma | |
string b = StringGet(conn.GetServer(node.EndPoint), key); | ||
Assert.Equal(value, b); // wrong primary, allow redirect | ||
|
||
var ex = Assert.Throws<RedisServerException>(() => StringGet(conn.GetServer(node.EndPoint), key, CommandFlags.NoRedirect)); | ||
var ex = Assert.Throws<RedisHashslotMigratedAndNoRedirectException>(() => StringGet(conn.GetServer(node.EndPoint), key, CommandFlags.NoRedirect)); | ||
Assert.StartsWith($"Key has MOVED to Endpoint {rightPrimaryNode.EndPoint} and hashslot {slot}", ex.Message); | ||
} | ||
|
||
|
@@ -188,7 +188,69 @@ static string StringGet(IServer server, RedisKey key, CommandFlags flags = Comma | |
string e = StringGet(conn.GetServer(node.EndPoint), key); | ||
Assert.Equal(value, e); // wrong replica, allow redirect | ||
|
||
var ex = Assert.Throws<RedisServerException>(() => StringGet(conn.GetServer(node.EndPoint), key, CommandFlags.NoRedirect)); | ||
var ex = Assert.Throws<RedisHashslotMigratedAndNoRedirectException>(() => StringGet(conn.GetServer(node.EndPoint), key, CommandFlags.NoRedirect)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure how much of an impact switching from RedisServerException to RedisHashslotMigratedAndNoRedirectException will have; it's a pretty specific criteria and error to cause this. One strategy that could be used to mitigate the impact would be to switch RedisHashslotMigratedAndNoRedirectException be a subclass of RedisServerException (requires unsealing the class). |
||
Assert.StartsWith($"Key has MOVED to Endpoint {rightPrimaryNode.EndPoint} and hashslot {slot}", ex.Message); | ||
} | ||
} | ||
} | ||
|
||
[Fact] | ||
public void IntentionalWrongServerForTransaction() | ||
{ | ||
static string[] TransactionalReplace(IServer server, RedisKey key, RedisValue newRedisValue, CommandFlags flags = CommandFlags.None) | ||
{ | ||
var database = server.Multiplexer.GetDatabase(); | ||
var transaction = database.CreateTransaction(); | ||
var serverEndpoint = new ServerEndPoint((ConnectionMultiplexer) server.Multiplexer, server.EndPoint); | ||
|
||
Task<RedisValue> originalVal = transaction.StringGetAsync(key, flags); | ||
Task<bool> writeVal = transaction.StringSetAsync(key, newRedisValue, null, false, When.Always, flags); | ||
Task<RedisValue> newVal = transaction.StringGetAsync(key, flags); | ||
|
||
var result = ((RedisTransaction)transaction).ExecuteInternal(flags, serverEndpoint); | ||
Assert.True(result); | ||
Assert.True(writeVal.Result); | ||
|
||
return new string[] { | ||
originalVal.Result, newVal.Result | ||
}; | ||
} | ||
|
||
using (var conn = Create()) | ||
{ | ||
var endpoints = conn.GetEndPoints(); | ||
var servers = endpoints.Select(e => conn.GetServer(e)).ToList(); | ||
|
||
var key = Me(); | ||
const string value = "abc"; | ||
const string newValue = "def"; | ||
var db = conn.GetDatabase(); | ||
db.KeyDelete(key, CommandFlags.FireAndForget); | ||
db.StringSet(key, value, flags: CommandFlags.FireAndForget); | ||
servers[0].Ping(); | ||
var config = servers[0].ClusterConfiguration; | ||
Assert.NotNull(config); | ||
int slot = conn.HashSlot(key); | ||
var rightPrimaryNode = config.GetBySlot(key); | ||
Assert.NotNull(rightPrimaryNode); | ||
Log("Right Primary: {0} {1}", rightPrimaryNode.EndPoint, rightPrimaryNode.NodeId); | ||
|
||
string[] responses = TransactionalReplace(conn.GetServer(rightPrimaryNode.EndPoint), key, newValue); | ||
Assert.Equal(value, responses[0]); // right primary | ||
Assert.Equal(newValue, responses[1]); | ||
|
||
db.KeyDelete(key, CommandFlags.FireAndForget); | ||
db.StringSet(key, value, flags: CommandFlags.FireAndForget); | ||
|
||
var node = config.Nodes.FirstOrDefault(x => !x.IsReplica && x.NodeId != rightPrimaryNode.NodeId); | ||
Assert.NotNull(node); | ||
Log("Using Primary: {0}", node.EndPoint, node.NodeId); | ||
{ | ||
string[] otherResponses = TransactionalReplace(conn.GetServer(node.EndPoint), key, newValue); | ||
Assert.Equal(value, otherResponses[0]); // right primary | ||
Assert.Equal(newValue, otherResponses[1]); | ||
|
||
var ex = Assert.Throws<RedisHashslotMigratedAndNoRedirectException>(() => TransactionalReplace(conn.GetServer(node.EndPoint), key, newValue, CommandFlags.NoRedirect)); | ||
Assert.StartsWith($"Key has MOVED to Endpoint {rightPrimaryNode.EndPoint} and hashslot {slot}", ex.Message); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a huge fan of this, but I also didn't want to change the public API of this for the sake of making a functional test work.