Skip to content

Commit a4a3058

Browse files
authored
Fix ordered consumer creation (#746)
* Fix ordered consumer creation * Add exponential backoff with jitter for retries Introduce `BackoffWithJitterAsync` to calculate delay with jitter, improving retry logic. Update `NatsJSOrderedConsumer` to utilize the new method during connection reset attempts.
1 parent e899639 commit a4a3058

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

src/NATS.Client.Core/NatsOpts.cs

+21
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
using Microsoft.Extensions.Logging;
55
using Microsoft.Extensions.Logging.Abstractions;
66
using NATS.Client.Core.Internal;
7+
#if NETSTANDARD
8+
using Random = NATS.Client.Core.Internal.NetStandardExtensions.Random;
9+
#endif
710

811
namespace NATS.Client.Core;
912

@@ -186,3 +189,21 @@ internal NatsOpts ReadUserInfoFromConnectionString()
186189
}
187190
}
188191
}
192+
193+
public static class NatsOptsExtensions
194+
{
195+
/// <summary>
196+
/// Applies an exponential backoff delay with an added random jitter for attempts.
197+
/// </summary>
198+
/// <param name="opts">The NatsOpts instance containing configuration settings for intervals and jitter.</param>
199+
/// <param name="iter">The current attempt iteration, used to calculate the exponential delay.</param>
200+
/// <returns>A task that completes after the calculated delay time has elapsed.</returns>
201+
public static Task BackoffWithJitterAsync(this NatsOpts opts, int iter)
202+
{
203+
var baseDelay = opts.ReconnectWaitMin.TotalMilliseconds * Math.Pow(2, iter - 1);
204+
var jitter = opts.ReconnectJitter.TotalMilliseconds * Random.Shared.NextDouble();
205+
206+
var delay = Math.Min(baseDelay + jitter, opts.ReconnectWaitMax.TotalMilliseconds);
207+
return Task.Delay(TimeSpan.FromMilliseconds(delay));
208+
}
209+
}

src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs

+8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
using Microsoft.Extensions.Logging;
33
using NATS.Client.Core;
44
using NATS.Client.JetStream.Models;
5+
#if NETSTANDARD
6+
using Random = NATS.Client.Core.Internal.NetStandardExtensions.Random;
7+
#endif
58

69
namespace NATS.Client.JetStream;
710

@@ -314,11 +317,16 @@ private async Task<NatsJSConsumer> RecreateConsumer(string consumer, ulong seq,
314317
catch (NatsJSApiNoResponseException)
315318
{
316319
}
320+
catch (NatsJSApiException apiException) when (apiException.Error.Code == 503)
321+
{
322+
}
317323

318324
if (i == _opts.MaxResetAttempts)
319325
{
320326
throw new NatsJSException("Maximum number of create attempts reached.");
321327
}
328+
329+
await _context.Connection.Opts.BackoffWithJitterAsync(i);
322330
}
323331

324332
Info = info;

0 commit comments

Comments
 (0)