Skip to content

Commit e12bfc3

Browse files
authored
Add priority group options (overflow) (#743)
* Add priority group options * Add support for priority groups in pull consumers This commit introduces priority group configuration for JetStream pull consumers. Implements behavior for `NextAsync`, `FetchAsync`, and `ConsumeAsync` with priority group settings. Unsupported cases like multiple groups and certain priority policies are handled with exceptions. * Fix format
1 parent 502a806 commit e12bfc3

File tree

8 files changed

+320
-1
lines changed

8 files changed

+320
-1
lines changed

src/NATS.Client.JetStream/Internal/NatsJSConsume.cs

+9
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ internal class NatsJSConsume<TMsg> : NatsSubBase
2727
private readonly string _consumer;
2828
private readonly CancellationToken _cancellationToken;
2929
private readonly INatsDeserialize<TMsg> _serializer;
30+
private readonly NatsJSPriorityGroupOpts? _priorityGroup;
3031
private readonly Timer _timer;
3132
private readonly Task _pullTask;
3233
private readonly NatsJSNotificationChannel? _notificationChannel;
@@ -59,6 +60,7 @@ public NatsJSConsume(
5960
Func<INatsJSNotification, CancellationToken, Task>? notificationHandler,
6061
INatsDeserialize<TMsg> serializer,
6162
NatsSubOpts? opts,
63+
NatsJSPriorityGroupOpts? priorityGroup,
6264
CancellationToken cancellationToken)
6365
: base(context.Connection, context.Connection.SubscriptionManager, subject, queueGroup, opts)
6466
{
@@ -69,6 +71,7 @@ public NatsJSConsume(
6971
_stream = stream;
7072
_consumer = consumer;
7173
_serializer = serializer;
74+
_priorityGroup = priorityGroup;
7275

7376
if (notificationHandler is { } handler)
7477
{
@@ -231,6 +234,9 @@ internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter comm
231234
MaxBytes = maxBytes,
232235
IdleHeartbeat = _idle,
233236
Expires = _expires,
237+
Group = _priorityGroup?.Group,
238+
MinPending = _priorityGroup?.MinPending ?? 0,
239+
MinAckPending = _priorityGroup?.MinAckPending ?? 0,
234240
};
235241

236242
await commandWriter.PublishAsync(
@@ -455,6 +461,9 @@ private void Pull(string origin, long batch, long maxBytes) => _pullRequests.Wri
455461
MaxBytes = maxBytes,
456462
IdleHeartbeat = _idle,
457463
Expires = _expires,
464+
Group = _priorityGroup?.Group,
465+
MinPending = _priorityGroup?.MinPending ?? 0,
466+
MinAckPending = _priorityGroup?.MinAckPending ?? 0,
458467
},
459468
Origin = origin,
460469
});

src/NATS.Client.JetStream/Internal/NatsJSFetch.cs

+6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ internal class NatsJSFetch<TMsg> : NatsSubBase
1818
private readonly string _stream;
1919
private readonly string _consumer;
2020
private readonly INatsDeserialize<TMsg> _serializer;
21+
private readonly NatsJSPriorityGroupOpts? _priorityGroup;
2122
private readonly Timer _hbTimer;
2223
private readonly Timer _expiresTimer;
2324
private readonly NatsJSNotificationChannel? _notificationChannel;
@@ -45,6 +46,7 @@ public NatsJSFetch(
4546
Func<INatsJSNotification, CancellationToken, Task>? notificationHandler,
4647
INatsDeserialize<TMsg> serializer,
4748
NatsSubOpts? opts,
49+
NatsJSPriorityGroupOpts? priorityGroup,
4850
CancellationToken cancellationToken)
4951
: base(context.Connection, context.Connection.SubscriptionManager, subject, queueGroup, opts)
5052
{
@@ -54,6 +56,7 @@ public NatsJSFetch(
5456
_stream = stream;
5557
_consumer = consumer;
5658
_serializer = serializer;
59+
_priorityGroup = priorityGroup;
5760

5861
if (notificationHandler is { } handler)
5962
{
@@ -169,6 +172,9 @@ internal override async ValueTask WriteReconnectCommandsAsync(CommandWriter comm
169172
Batch = _maxMsgs,
170173
IdleHeartbeat = _idle,
171174
Expires = _expires,
175+
Group = _priorityGroup?.Group,
176+
MinPending = _priorityGroup?.MinPending ?? 0,
177+
MinAckPending = _priorityGroup?.MinAckPending ?? 0,
172178
};
173179

174180
await commandWriter.PublishAsync(

src/NATS.Client.JetStream/Models/ConsumerConfig.cs

+40
Original file line numberDiff line numberDiff line change
@@ -254,4 +254,44 @@ public ConsumerConfig(string name)
254254
[System.Text.Json.Serialization.JsonPropertyName("metadata")]
255255
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
256256
public IDictionary<string, string>? Metadata { get; set; }
257+
258+
/// <summary>
259+
/// Defines a collection of priority groups for a pull consumer.
260+
/// </summary>
261+
/// <remarks>
262+
/// The priority group names must conform to the requirements outlined in ADR-6: alphanumeric characters, dashes, underscores, forward slashes, or equals signs are allowed, with a maximum length of 16 characters per group.
263+
/// Configuring this property for a push consumer will result in an error.
264+
/// </remarks>
265+
[System.Text.Json.Serialization.JsonPropertyName("priority_groups")]
266+
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
267+
#if NET6_0
268+
public ICollection<string>? PriorityGroups { get; set; }
269+
#else
270+
public ICollection<string>? PriorityGroups { get; init; }
271+
#endif
272+
273+
/// <summary>
274+
/// Specifies the priority policy for consumer message selection, such as prioritizing <c>overflow</c> or <c>pinned_client</c>.
275+
/// </summary>
276+
[System.Text.Json.Serialization.JsonPropertyName("priority_policy")]
277+
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
278+
[System.ComponentModel.DataAnnotations.StringLength(int.MaxValue, MinimumLength = 1)]
279+
[System.ComponentModel.DataAnnotations.RegularExpression(@"^[^.*>]+$")]
280+
#if NET6_0
281+
public string? PriorityPolicy { get; set; }
282+
#else
283+
public string? PriorityPolicy { get; init; }
284+
#endif
285+
286+
/// <summary>
287+
/// Specifies the duration for which the consumer's pinned message priority is maintained.
288+
/// </summary>
289+
[System.Text.Json.Serialization.JsonPropertyName("priority_timeout")]
290+
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
291+
[System.Text.Json.Serialization.JsonConverter(typeof(NatsJSJsonNullableNanosecondsConverter))]
292+
#if NET6_0
293+
public TimeSpan? PinnedTTL { get; set; }
294+
#else
295+
public TimeSpan? PinnedTTL { get; init; }
296+
#endif
257297
}

src/NATS.Client.JetStream/Models/ConsumerGetnextRequest.cs

+36
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,40 @@ public record ConsumerGetnextRequest
4646
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
4747
[System.Text.Json.Serialization.JsonConverter(typeof(NatsJSJsonNanosecondsConverter))]
4848
public TimeSpan IdleHeartbeat { get; set; }
49+
50+
/// <summary>
51+
/// Priority group.
52+
/// </summary>
53+
[System.Text.Json.Serialization.JsonPropertyName("group")]
54+
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
55+
public string? Group { get; set; }
56+
57+
/// <summary>
58+
/// Priority group minimum pending messages.
59+
/// </summary>
60+
/// <remarks>
61+
/// When specified, this Pull request will only receive messages when the consumer has at least this many pending messages.
62+
/// </remarks>
63+
[System.Text.Json.Serialization.JsonPropertyName("min_pending")]
64+
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
65+
[System.ComponentModel.DataAnnotations.Range(long.MinValue, long.MaxValue)]
66+
public long MinPending { get; set; }
67+
68+
/// <summary>
69+
/// Priority group minimum ACK pending messages.
70+
/// </summary>
71+
/// <remarks>
72+
/// When specified, this Pull request will only receive messages when the consumer has at least this many ack pending messages.
73+
/// </remarks>
74+
[System.Text.Json.Serialization.JsonPropertyName("min_ack_pending")]
75+
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
76+
[System.ComponentModel.DataAnnotations.Range(long.MinValue, long.MaxValue)]
77+
public long MinAckPending { get; set; }
78+
79+
/// <summary>
80+
/// Priority group ID.
81+
/// </summary>
82+
[System.Text.Json.Serialization.JsonPropertyName("id")]
83+
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
84+
public string? Id { get; set; }
4985
}

src/NATS.Client.JetStream/NatsJSConsumer.cs

+18-1
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ public async IAsyncEnumerable<NatsJSMsg<T>> ConsumeAsync<T>(
145145
IdleHeartbeat = opts.IdleHeartbeat,
146146
Expires = opts.Expires,
147147
NotificationHandler = opts.NotificationHandler,
148+
PriorityGroup = opts.PriorityGroup,
148149
},
149150
serializer,
150151
cancellationToken: cancellationToken).ConfigureAwait(false);
@@ -306,6 +307,7 @@ internal async ValueTask<NatsJSConsume<T>> ConsumeInternalAsync<T>(INatsDeserial
306307
expires: timeouts.Expires,
307308
idle: timeouts.IdleHeartbeat,
308309
notificationHandler: opts.NotificationHandler,
310+
priorityGroup: opts.PriorityGroup,
309311
cancellationToken: cancellationToken);
310312

311313
await _context.Connection.AddSubAsync(sub: sub, cancellationToken).ConfigureAwait(false);
@@ -319,6 +321,9 @@ await sub.CallMsgNextAsync(
319321
MaxBytes = max.MaxBytes,
320322
IdleHeartbeat = timeouts.IdleHeartbeat,
321323
Expires = timeouts.Expires,
324+
Group = opts.PriorityGroup?.Group,
325+
MinPending = opts.PriorityGroup?.MinPending ?? 0,
326+
MinAckPending = opts.PriorityGroup?.MinAckPending ?? 0,
322327
},
323328
cancellationToken).ConfigureAwait(false);
324329

@@ -402,6 +407,7 @@ internal async ValueTask<NatsJSFetch<T>> FetchInternalAsync<T>(
402407
expires: timeouts.Expires,
403408
notificationHandler: opts.NotificationHandler,
404409
idle: timeouts.IdleHeartbeat,
410+
priorityGroup: opts.PriorityGroup,
405411
cancellationToken: cancellationToken);
406412

407413
await _context.Connection.AddSubAsync(sub: sub, cancellationToken).ConfigureAwait(false);
@@ -412,14 +418,25 @@ await sub.CallMsgNextAsync(
412418
// When no wait is set we don't need to send the idle heartbeat and expiration
413419
// If no message is available the server will respond with a 404 immediately
414420
// If messages are available the server will send a 408 direct after the last message
415-
? new ConsumerGetnextRequest { Batch = max.MaxMsgs, MaxBytes = max.MaxBytes, NoWait = opts.NoWait }
421+
? new ConsumerGetnextRequest
422+
{
423+
Batch = max.MaxMsgs,
424+
MaxBytes = max.MaxBytes,
425+
NoWait = opts.NoWait,
426+
Group = opts.PriorityGroup?.Group,
427+
MinPending = opts.PriorityGroup?.MinPending ?? 0,
428+
MinAckPending = opts.PriorityGroup?.MinAckPending ?? 0,
429+
}
416430
: new ConsumerGetnextRequest
417431
{
418432
Batch = max.MaxMsgs,
419433
MaxBytes = max.MaxBytes,
420434
IdleHeartbeat = timeouts.IdleHeartbeat,
421435
Expires = timeouts.Expires,
422436
NoWait = opts.NoWait,
437+
Group = opts.PriorityGroup?.Group,
438+
MinPending = opts.PriorityGroup?.MinPending ?? 0,
439+
MinAckPending = opts.PriorityGroup?.MinAckPending ?? 0,
423440
},
424441
cancellationToken).ConfigureAwait(false);
425442

src/NATS.Client.JetStream/NatsJSContext.Consumers.cs

+19
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,25 @@ private async ValueTask<NatsJSConsumer> CreateOrUpdateConsumerInternalAsync(
255255
subject += $".{config.FilterSubject}";
256256
}
257257

258+
// ADR-42: In the initial implementation we should limit PriorityGroups to one per consumer only
259+
// and error should one be made with multiple groups. In future iterations multiple groups will
260+
// be supported along with dynamic partitioning of stream data.
261+
if (config.PriorityGroups != null && config.PriorityGroups.Count != 1)
262+
{
263+
throw new NatsJSException("Cannot create consumers with multiple priority groups.");
264+
}
265+
266+
if (config.PriorityPolicy is "pinned_client")
267+
{
268+
throw new NotImplementedException("Pinned clients are not supported yet.");
269+
}
270+
271+
// TODO: enum these values?
272+
if (config.PriorityPolicy != null && config.PriorityPolicy != "none" && config.PriorityPolicy != "overflow" && config.PriorityPolicy != "pinned_client")
273+
{
274+
throw new NatsJSException("Cannot create consumers with priority policy other than 'overflow', 'pinned_client', or 'none'.");
275+
}
276+
258277
var response = await JSRequestResponseAsync<ConsumerCreateRequest, ConsumerInfo>(
259278
subject: subject,
260279
new ConsumerCreateRequest

src/NATS.Client.JetStream/NatsJSOpts.cs

+39
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,12 @@ public record NatsJSConsumeOpts
116116
public int? ThresholdBytes { get; init; }
117117

118118
public Func<INatsJSNotification, CancellationToken, Task>? NotificationHandler { get; init; }
119+
120+
/// <summary>
121+
/// Optional priority group configuration used for consuming messages.
122+
/// Defines a group name and constraints for minimum pending messages and acknowledgments.
123+
/// </summary>
124+
public NatsJSPriorityGroupOpts? PriorityGroup { get; init; }
119125
}
120126

121127
/// <summary>
@@ -134,6 +140,12 @@ public record NatsJSNextOpts
134140
public TimeSpan? IdleHeartbeat { get; init; }
135141

136142
public Func<INatsJSNotification, CancellationToken, Task>? NotificationHandler { get; init; }
143+
144+
/// <summary>
145+
/// Optional priority group configuration used for consuming messages.
146+
/// Defines a group name and constraints for minimum pending messages and acknowledgments.
147+
/// </summary>
148+
public NatsJSPriorityGroupOpts? PriorityGroup { get; init; }
137149
}
138150

139151
/// <summary>
@@ -163,6 +175,12 @@ public record NatsJSFetchOpts
163175

164176
public Func<INatsJSNotification, CancellationToken, Task>? NotificationHandler { get; init; }
165177

178+
/// <summary>
179+
/// Optional priority group configuration used for consuming messages.
180+
/// Defines a group name and constraints for minimum pending messages and acknowledgments.
181+
/// </summary>
182+
public NatsJSPriorityGroupOpts? PriorityGroup { get; init; }
183+
166184
/// <summary>
167185
/// Does not wait for messages to be available
168186
/// </summary>
@@ -196,3 +214,24 @@ public record NatsJSPubOpts : NatsPubOpts
196214
// rnum int // Retry attempts
197215
public int RetryAttempts { get; init; } = 2;
198216
}
217+
218+
/// <summary>
219+
/// Represents options for configuring a priority group within JetStream operations.
220+
/// </summary>
221+
public record NatsJSPriorityGroupOpts
222+
{
223+
/// <summary>
224+
/// Specifies the group name for prioritization in JetStream consumer operations.
225+
/// </summary>
226+
public string? Group { get; init; }
227+
228+
/// <summary>
229+
/// When specified, this Pull request will only receive messages when the consumer has at least this many pending messages.
230+
/// </summary>
231+
public long MinPending { get; set; }
232+
233+
/// <summary>
234+
/// When specified, this Pull request will only receive messages when the consumer has at least this many ack pending messages.
235+
/// </summary>
236+
public long MinAckPending { get; set; }
237+
}

0 commit comments

Comments
 (0)