Skip to content

Commit f408902

Browse files
committed
Add support for blocking XREAD and XREADGROUP
Issues redis#237 and redis#255.
1 parent db17b89 commit f408902

File tree

8 files changed

+628
-0
lines changed

8 files changed

+628
-0
lines changed

src/NRedisStack/CoreCommands/CoreCommandBuilder.cs

+75
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,81 @@ public static SerializedCommand BRPopLPush(RedisKey source, RedisKey destination
109109
return new SerializedCommand(RedisCoreCommands.BRPOPLPUSH, args);
110110
}
111111

112+
public static SerializedCommand XRead(RedisKey[] keys, RedisValue[] positions, int? count, int? timeoutMilliseconds)
113+
{
114+
if (keys.Length == 0)
115+
{
116+
throw new ArgumentException("At least one key must be provided.");
117+
}
118+
119+
if (keys.Length != positions.Length)
120+
{
121+
throw new ArgumentException("The number of keys and positions must be the same.");
122+
}
123+
124+
List<object> args = new List<object>();
125+
126+
if (count != null)
127+
{
128+
args.Add(CoreArgs.COUNT);
129+
args.Add(count);
130+
}
131+
132+
if (timeoutMilliseconds != null)
133+
{
134+
args.Add(CoreArgs.BLOCK);
135+
args.Add(timeoutMilliseconds);
136+
}
137+
138+
args.Add(CoreArgs.STREAMS);
139+
args.AddRange(keys.Cast<object>());
140+
args.AddRange(positions.Cast<object>());
141+
142+
return new SerializedCommand(RedisCoreCommands.XREAD, args);
143+
}
144+
145+
public static SerializedCommand XReadGroup(RedisValue groupName, RedisValue consumerName, RedisKey[] keys, RedisValue[] positions, int? count, int? timeoutMilliseconds, bool? noAcknowledge)
146+
{
147+
if (keys.Length == 0)
148+
{
149+
throw new ArgumentException("At least one key must be provided.");
150+
}
151+
152+
if (keys.Length != positions.Length)
153+
{
154+
throw new ArgumentException("The number of keys and positions must be the same.");
155+
}
156+
157+
List<object> args = new List<object>();
158+
159+
args.Add(CoreArgs.GROUP);
160+
args.Add(groupName);
161+
args.Add(consumerName);
162+
163+
if (count != null)
164+
{
165+
args.Add(CoreArgs.COUNT);
166+
args.Add(count);
167+
}
168+
169+
if (timeoutMilliseconds != null)
170+
{
171+
args.Add(CoreArgs.BLOCK);
172+
args.Add(timeoutMilliseconds);
173+
}
174+
175+
if (noAcknowledge != null && noAcknowledge.Value)
176+
{
177+
args.Add(CoreArgs.NOACK);
178+
}
179+
180+
args.Add(CoreArgs.STREAMS);
181+
args.AddRange(keys.Cast<object>());
182+
args.AddRange(positions.Cast<object>());
183+
184+
return new SerializedCommand(RedisCoreCommands.XREADGROUP, args);
185+
}
186+
112187
private static SerializedCommand BlockingCommandWithKeysAndTimeout(String command, RedisKey[] keys, double timeout)
113188
{
114189
if (keys.Length == 0)

src/NRedisStack/CoreCommands/CoreCommands.cs

+111
Original file line numberDiff line numberDiff line change
@@ -370,5 +370,116 @@ public static bool ClientSetInfo(this IDatabase db, SetInfoAttr attr, string val
370370
var command = CoreCommandBuilder.BRPopLPush(source, destination, timeout);
371371
return db.Execute(command).ToRedisValue();
372372
}
373+
374+
/// <summary>
375+
/// The XREAD command.
376+
/// <para/>
377+
/// Read data from one or multiple streams, only returning entries with an ID greater than an ID provided by the caller.
378+
/// </summary>
379+
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
380+
/// <param name="keys">Keys of the streams where to read from.</param>
381+
/// <param name="positions">The positions from which to begin reading for each stream. See
382+
/// <see cref="NRedisStack.Core.DataTypes.StreamSpecialIds"/> for special Ids that can be used.</param>
383+
/// <param name="count">The maximum number of messages to return from each stream.</param>
384+
/// <param name="timeoutMilliseconds">Amount of time in milliseconds to block in case all the streams are empty.
385+
/// If not provided, or set to <c>null</c> then the read does not block. If set to <c>0</c> then it blocks indefinitely.</param>
386+
/// <returns>A value of <see cref="RedisStreamEntries"/> for each stream, or <c>null</c> if the command times out
387+
/// on the server.</returns>
388+
/// <remarks>
389+
/// <para>This is the blocking alternative for <seealso cref="IDatabase.StreamRead(StackExchange.Redis.StreamPosition[],System.Nullable{int},StackExchange.Redis.CommandFlags)"/>.</para>
390+
/// <para><seealso href="https://redis.io/commands/xread"/></para>
391+
/// </remarks>
392+
public static RedisStreamEntries[]? XRead(this IDatabase db, RedisKey[] keys, RedisValue[] positions, int? count = null, int? timeoutMilliseconds = null)
393+
{
394+
var command = CoreCommandBuilder.XRead(keys, positions, count, timeoutMilliseconds);
395+
return db.Execute(command).ToRedisStreamEntries();
396+
}
397+
398+
/// <summary>
399+
/// Syntactic sugar for <see cref="XRead(StackExchange.Redis.IDatabase,StackExchange.Redis.RedisKey[],StackExchange.Redis.RedisValue[],System.Nullable{int},System.Nullable{int})"/>,
400+
/// where only one stream is being read from.
401+
/// </summary>
402+
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
403+
/// <param name="key">Key of the stream where to read from.</param>
404+
/// <param name="position">The position from which to begin reading. See
405+
/// <see cref="NRedisStack.Core.DataTypes.StreamSpecialIds"/> for special Ids that can be used.</param>
406+
/// <param name="count">The maximum number of messages to return from each stream.</param>
407+
/// <param name="timeoutMilliseconds">Amount of time in milliseconds to block in case all the streams are empty.
408+
/// If not provided, or set to <c>null</c> then the read does not block. If set to <c>0</c> then it blocks indefinitely.</param>
409+
/// <returns>A <see cref="StreamEntry"/> list with the data read from the stream, of <c>null</c> if the command
410+
/// times out on the server.</returns>
411+
/// <remarks>
412+
/// <para>This is the blocking alternative for <seealso cref="IDatabase.StreamRead(StackExchange.Redis.RedisKey,StackExchange.Redis.RedisValue,System.Nullable{int},StackExchange.Redis.CommandFlags)"/>.</para>
413+
/// <para><seealso href="https://redis.io/commands/xread"/></para>
414+
/// </remarks>
415+
public static StreamEntry[]? XRead(this IDatabase db, RedisKey key, RedisValue position, int? count = null, int? timeoutMilliseconds = null)
416+
{
417+
var result = XRead(db, new[] { key }, new[] { position }, count, timeoutMilliseconds);
418+
if (result == null || result.Length == 0)
419+
{
420+
return null;
421+
}
422+
return result[0].Entries;
423+
}
424+
425+
/// <summary>
426+
/// The XREADGROUP command.
427+
/// <para/>
428+
/// Read new or historical messages in one or several streams, for a consumer in a consumer group.
429+
/// </summary>
430+
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
431+
/// <param name="groupName">The consumer group name.</param>
432+
/// <param name="consumerName">The name of the consumer in the consumer group.</param>
433+
/// <param name="keys">Keys of the streams where to read from.</param>
434+
/// <param name="positions">The positions from which to begin reading for each stream. See
435+
/// <see cref="NRedisStack.Core.DataTypes.StreamSpecialIds"/> for special Ids that can be used.</param>
436+
/// <param name="count">The maximum number of messages to return from each stream.</param>
437+
/// <param name="timeoutMilliseconds">Amount of time in milliseconds to block in case all the streams are empty.
438+
/// If not provided, or set to <c>null</c> then the read does not block. If set to <c>0</c> then it blocks indefinitely.</param>
439+
/// <param name="noAck">If set to <c>true</c> then inform the server that it should not wait for ACK for the
440+
/// messages it sends to this read call.</param>
441+
/// <returns>A value of <see cref="RedisStreamEntries"/> for each stream, or <c>null</c> if the command times out
442+
/// on the server.</returns>
443+
/// <remarks>
444+
/// <para>This is the blocking alternative for <seealso cref="IDatabase.StreamReadGroup(StackExchange.Redis.StreamPosition[],StackExchange.Redis.RedisValue,StackExchange.Redis.RedisValue,System.Nullable{int},StackExchange.Redis.CommandFlags)"/>.</para>
445+
/// <para><seealso href="https://redis.io/commands/xreadgroup"/></para>
446+
/// </remarks>
447+
public static RedisStreamEntries[]? XReadGroup(this IDatabase db, RedisValue groupName, RedisValue consumerName, RedisKey[] keys, RedisValue[] positions, int? count = null, int? timeoutMilliseconds = null, bool? noAck = null)
448+
{
449+
var command = CoreCommandBuilder.XReadGroup(groupName, consumerName, keys, positions, count, timeoutMilliseconds, noAck);
450+
return db.Execute(command).ToRedisStreamEntries();
451+
}
452+
453+
/// <summary>
454+
/// Syntactic sugar for <see cref="XReadGroup(StackExchange.Redis.IDatabase,StackExchange.Redis.RedisValue,StackExchange.Redis.RedisValue,StackExchange.Redis.RedisKey[],StackExchange.Redis.RedisValue[],System.Nullable{int},System.Nullable{int},System.Nullable{bool})"/>,
455+
/// where only one stream is being read from.
456+
/// </summary>
457+
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
458+
/// <param name="groupName">The consumer group name.</param>
459+
/// <param name="consumerName">The name of the consumer in the consumer group.</param>
460+
/// <param name="key">Key of the stream where to read from.</param>
461+
/// <param name="position">The position from which to begin reading. See
462+
/// <see cref="NRedisStack.Core.DataTypes.StreamSpecialIds"/> for special Ids that can be used.</param>
463+
/// <param name="count">The maximum number of messages to return from each stream.</param>
464+
/// <param name="timeoutMilliseconds">Amount of time in milliseconds to block in case all the streams are empty.
465+
/// If not provided, or set to <c>null</c> then the read does not block. If set to <c>0</c> then it blocks indefinitely.</param>
466+
/// <param name="noAck">If set to <c>true</c> then inform the server that it should not wait for ACK for the
467+
/// messages it sends to this read call.</param>
468+
/// <returns>A <see cref="StreamEntry"/> list with the data read from the stream, of <c>null</c> if the command
469+
/// times out on the server.</returns>
470+
/// <remarks>
471+
/// <para>This is the blocking alternative for <seealso cref="IDatabase.StreamReadGroup(StackExchange.Redis.RedisKey,StackExchange.Redis.RedisValue,StackExchange.Redis.RedisValue,System.Nullable{StackExchange.Redis.RedisValue},System.Nullable{int},StackExchange.Redis.CommandFlags)"/>.</para>
472+
/// <para><seealso href="https://redis.io/commands/xreadgroup"/></para>
473+
/// </remarks>
474+
public static StreamEntry[]? XReadGroup(this IDatabase db, RedisValue groupName, RedisValue consumerName, RedisKey key, RedisValue position, int? count = null, int? timeoutMilliseconds = null, bool? noAck = null)
475+
{
476+
var result = XReadGroup(db, groupName, consumerName, new[] { key }, new[] { position }, count, timeoutMilliseconds, noAck);
477+
if (result == null || result.Length == 0)
478+
{
479+
return null;
480+
}
481+
return result[0].Entries;
482+
}
483+
373484
}
374485
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
using StackExchange.Redis;
2+
3+
namespace NRedisStack.Core.DataTypes;
4+
5+
/// <summary>
6+
/// Holds the key and the entries for a Redis Stream, as returned by, for example, the XREAD or the XREADGROUP commands.
7+
/// </summary>
8+
public readonly struct RedisStreamEntries
9+
{
10+
internal RedisStreamEntries(RedisKey key, StreamEntry[] entries)
11+
{
12+
Key = key;
13+
Entries = entries;
14+
}
15+
16+
/// <summary>
17+
/// The key for the stream.
18+
/// </summary>
19+
public RedisKey Key { get; }
20+
21+
/// <summary>
22+
/// An array of entries contained within the stream.
23+
/// </summary>
24+
public StreamEntry[] Entries { get; }
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
namespace NRedisStack.Core.DataTypes;
2+
3+
/// <summary>
4+
/// Constants for special stream Ids, to be used, for example, with the XREAD and XREADGROUP commands
5+
/// </summary>
6+
public class StreamSpecialIds
7+
{
8+
/// <summary>
9+
/// Smallest incomplete ID, can be used for reading from the very first message in a stream.
10+
/// </summary>
11+
public const string AllMessagesId = "0";
12+
13+
/// <summary>
14+
/// For receiving only new messages that arrive after blocking on a read.
15+
/// </summary>
16+
public const string NewMessagesId = "$";
17+
18+
/// <summary>
19+
/// For receiving only messages that were never delivered to any other consumer.
20+
/// </summary>
21+
public const string UndeliveredMessagesId = ">";
22+
}

src/NRedisStack/CoreCommands/Literals/CommandArgs.cs

+4
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@ namespace NRedisStack.Core.Literals
22
{
33
internal static class CoreArgs
44
{
5+
public const string BLOCK = "BLOCK";
56
public const string COUNT = "COUNT";
7+
public const string GROUP = "GROUP";
68
public const string LEFT = "LEFT";
79
public const string MAX = "MAX";
810
public const string MIN = "MIN";
11+
public const string NOACK = "NOACK";
912
public const string RIGHT = "RIGHT";
13+
public const string STREAMS = "STREAMS";
1014
public const string lib_name = "LIB-NAME";
1115
public const string lib_ver = "LIB-VER";
1216
}

src/NRedisStack/CoreCommands/Literals/Commands.cs

+2
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,7 @@ internal static class RedisCoreCommands
1515
public const string BZPOPMIN = "BZPOPMIN";
1616
public const string CLIENT = "CLIENT";
1717
public const string SETINFO = "SETINFO";
18+
public const string XREAD = "XREAD";
19+
public const string XREADGROUP = "XREADGROUP";
1820
}
1921
}

src/NRedisStack/ResponseParser.cs

+51
Original file line numberDiff line numberDiff line change
@@ -802,5 +802,56 @@ public static Dictionary<string, RedisResult>[] ToDictionarys(this RedisResult r
802802

803803
return new Tuple<RedisKey, List<RedisValue>>(resultKey, values);
804804
}
805+
806+
public static RedisStreamEntries[]? ToRedisStreamEntries(this RedisResult result)
807+
{
808+
if (result.IsNull)
809+
{
810+
return null;
811+
}
812+
813+
var resultArray = (RedisResult[])result!;
814+
RedisStreamEntries[] redisStreamEntries = new RedisStreamEntries[resultArray.Length];
815+
for (int i = 0; i < resultArray.Length; i++)
816+
{
817+
RedisResult[] streamResultArray = (RedisResult[])resultArray[i]!;
818+
RedisKey streamKey = streamResultArray[0].ToRedisKey();
819+
StreamEntry[] streamEntries = ParseStreamEntries(streamResultArray[1].ToArray());
820+
redisStreamEntries[i] = new RedisStreamEntries(streamKey, streamEntries);
821+
}
822+
823+
return redisStreamEntries;
824+
}
825+
826+
private static StreamEntry[] ParseStreamEntries(IReadOnlyList<RedisResult> results)
827+
{
828+
int count = results.Count;
829+
StreamEntry[] streamEntries = new StreamEntry[count];
830+
831+
for (int i = 0; i < count; i++)
832+
{
833+
RedisResult[] streamEntryArray = (RedisResult[])results[i]!;
834+
RedisValue key = streamEntryArray[0].ToRedisValue();
835+
NameValueEntry[] nameValueEntries = ParseNameValueEntries(streamEntryArray[1].ToArray());
836+
streamEntries[i] = new StreamEntry(key, nameValueEntries);
837+
}
838+
839+
return streamEntries;
840+
}
841+
842+
private static NameValueEntry[] ParseNameValueEntries(IReadOnlyList<RedisResult> redisResults)
843+
{
844+
int count = redisResults.Count / 2;
845+
var nameValueEntries = new NameValueEntry[count];
846+
847+
for (int i = 0; i < count; i++)
848+
{
849+
nameValueEntries[i] = new NameValueEntry(
850+
redisResults[2 * i].ToRedisValue(),
851+
redisResults[2 * i + 1].ToRedisValue());
852+
}
853+
854+
return nameValueEntries;
855+
}
805856
}
806857
}

0 commit comments

Comments
 (0)