Skip to content

Commit

Permalink
Add support for BZMPOP, BZPOPMIN and BZPOPMAX
Browse files Browse the repository at this point in the history
Add support for the BZMPOP, BZPOPMIN, BZPOPMAX commands.

Issues #232, #233 and #234.

These commands are blocking on the server, so they go against the current
policy of the StackExchange.Redis library. Therefore make it obvious in
the code documentation that attention must be given to the timeout in
the connection multiplexer, client-side.

The StackExchange.Redis library already defines a type for the payload
returned by BZMPOP (which is the same as for ZMPOP), namely the
SortedSetPopResult class. However, the constructor of that class is
internal in the library, so we can't create instances of it. Therefore
roll our out type for a <value, score> pair, and use Tuple to pair a key
with a list of such <value, score> pairs.

Instead of using Order to signal from which end of the sorted set to
pop, define a MinMaxModifier enum, which more clearly expresses the
intention and maps directly to the Redis command being executed.
  • Loading branch information
gerzse committed Feb 6, 2024
1 parent e6da72f commit 410c474
Show file tree
Hide file tree
Showing 8 changed files with 603 additions and 5 deletions.
53 changes: 53 additions & 0 deletions src/NRedisStack/CoreCommands/CoreCommandBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using NRedisStack.RedisStackCommands;
using NRedisStack.Core.Literals;
using NRedisStack.Core;
using NRedisStack.Core.DataTypes;
using StackExchange.Redis;

namespace NRedisStack
{
Expand All @@ -18,5 +20,56 @@ public static SerializedCommand ClientSetInfo(SetInfoAttr attr, string value)

return new SerializedCommand(RedisCoreCommands.CLIENT, RedisCoreCommands.SETINFO, attrValue, value);
}

public static SerializedCommand BzmPop(double timeout, RedisKey[] keys, MinMaxModifier minMaxModifier, long? count)
{
if (keys.Length == 0)
{
throw new ArgumentException("At least one key must be provided.");
}

List<object> args = new List<object>();

args.Add(timeout);
args.Add(keys.Length);
args.AddRange(keys.Cast<object>());
args.Add(minMaxModifier == MinMaxModifier.Min ? CoreArgs.MIN : CoreArgs.MAX);

if (count != null)
{
args.Add(CoreArgs.COUNT);
args.Add(count);
}

return new SerializedCommand(RedisCoreCommands.BZMPOP, args);
}

public static SerializedCommand BzPopMin(RedisKey[] keys, double timeout)
{
if (keys.Length == 0)
{
throw new ArgumentException("At least one key must be provided.");
}

List<object> args = new List<object>();
args.AddRange(keys.Cast<object>());
args.Add(timeout);

return new SerializedCommand(RedisCoreCommands.BZPOPMIN, args);
}

public static SerializedCommand BzPopMax(RedisKey[] keys, double timeout)
{
if (keys.Length == 0)
{
throw new ArgumentException("At least one key must be provided.");
}

List<object> args = new List<object>();
args.AddRange(keys.Cast<object>());
args.Add(timeout);

return new SerializedCommand(RedisCoreCommands.BZPOPMAX, args);
}
}
}
156 changes: 156 additions & 0 deletions src/NRedisStack/CoreCommands/CoreCommands.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using NRedisStack.Core;
using NRedisStack.Core.DataTypes;
using StackExchange.Redis;

namespace NRedisStack
{

Expand All @@ -19,5 +21,159 @@ public static bool ClientSetInfo(this IDatabase db, SetInfoAttr attr, string val
return false;
return db.Execute(CoreCommandBuilder.ClientSetInfo(attr, value)).OKtoBoolean();
}

/// <summary>
/// The BZMPOP command.
/// <p/>
/// Removes and returns up to <paramref name="count"/> entries from the first non-empty sorted set in
/// <paramref name="keys"/>. If none of the sets contain elements, the call blocks on the server until elements
/// become available, or the given <paramref name="timeout"/> expires. A <paramref name="timeout"/> of <c>0</c>
/// means to wait indefinitely server-side. Returns <c>null</c> if the server timeout expires.
/// <p/>
/// When using this, pay attention to the timeout configured in the client, on the
/// <see cref="ConnectionMultiplexer"/>, which by default can be too small:
/// <code>
/// ConfigurationOptions configurationOptions = new ConfigurationOptions();
/// configurationOptions.SyncTimeout = 120000; // set a meaningful value here
/// configurationOptions.EndPoints.Add("localhost");
/// ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(configurationOptions);
/// </code>
/// If the connection multiplexer timeout expires in the client, a <c>StackExchange.Redis.RedisTimeoutException</c>
/// is thrown.
/// <p/>
/// This is an extension method added to the <see cref="IDatabase"/> class, for convenience.
/// </summary>
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
/// <param name="timeout">Server-side timeout for the wait. A value of <c>0</c> means to wait indefinitely.</param>
/// <param name="keys">The keys to check.</param>
/// <param name="minMaxModifier">Specify from which end of the sorted set to pop values. If set to <c>MinMaxModifier.Min</c>
/// then the minimum elements will be popped, otherwise the maximum values.</param>
/// <param name="count">The maximum number of records to pop out. If set to <c>null</c> then the server default
/// will be used.</param>
/// <returns>A collection of sorted set entries paired with their scores, together with the key they were popped
/// from, or <c>null</c> if the server timeout expires.</returns>
/// <remarks><seealso href="https://redis.io/commands/bzmpop"/></remarks>
public static Tuple<RedisKey, List<RedisValueWithScore>>? BzmPop(this IDatabase db, double timeout, RedisKey[] keys, MinMaxModifier minMaxModifier, long? count = null)
{
var command = CoreCommandBuilder.BzmPop(timeout, keys, minMaxModifier, count);
return db.Execute(command).ToSortedSetPopResults();
}

/// <summary>
/// Syntactic sugar for
/// <see cref="BzmPop(StackExchange.Redis.IDatabase,double,StackExchange.Redis.RedisKey[],NRedisStack.Core.DataTypes.MinMaxModifier,System.Nullable{long})"/>,
/// where only one key is used.
/// </summary>
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
/// <param name="timeout">Server-side timeout for the wait. A value of <c>0</c> means to wait indefinitely.</param>
/// <param name="key">The key to check.</param>
/// <param name="minMaxModifier">Specify from which end of the sorted set to pop values. If set to <c>MinMaxModifier.Min</c>
/// then the minimum elements will be popped, otherwise the maximum values.</param>
/// <param name="count">The maximum number of records to pop out. If set to <c>null</c> then the server default
/// will be used.</param>
/// <returns>A collection of sorted set entries paired with their scores, together with the key they were popped
/// from, or <c>null</c> if the server timeout expires.</returns>
/// <remarks><seealso href="https://redis.io/commands/bzmpop"/></remarks>
public static Tuple<RedisKey, List<RedisValueWithScore>>? BzmPop(this IDatabase db, double timeout, RedisKey key, MinMaxModifier minMaxModifier, long? count = null)
{
return BzmPop(db, timeout, new[] { key }, minMaxModifier, count);
}

/// <summary>
/// The BZPOPMIN command.
/// <p/>
/// Removes and returns the entry with the smallest score from the first non-empty sorted set in
/// <paramref name="keys"/>. If none of the sets contain elements, the call blocks on the server until elements
/// become available, or the given <paramref name="timeout"/> expires. A <paramref name="timeout"/> of <c>0</c>
/// means to wait indefinitely server-side. Returns <c>null</c> if the server timeout expires.
/// <p/>
/// When using this, pay attention to the timeout configured in the client, on the
/// <see cref="ConnectionMultiplexer"/>, which by default can be too small:
/// <code>
/// ConfigurationOptions configurationOptions = new ConfigurationOptions();
/// configurationOptions.SyncTimeout = 120000; // set a meaningful value here
/// configurationOptions.EndPoints.Add("localhost");
/// ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(configurationOptions);
/// </code>
/// If the connection multiplexer timeout expires in the client, a <c>StackExchange.Redis.RedisTimeoutException</c>
/// is thrown.
/// <p/>
/// This is an extension method added to the <see cref="IDatabase"/> class, for convenience.
/// </summary>
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
/// <param name="keys">The keys to check.</param>
/// <param name="timeout">Server-side timeout for the wait. A value of <c>0</c> means to wait indefinitely.</param>
/// <returns>A sorted set entry paired with its score, together with the key it was popped from, or <c>null</c>
/// if the server timeout expires.</returns>
/// <remarks><seealso href="https://redis.io/commands/bzpopmin"/></remarks>
public static Tuple<RedisKey, RedisValueWithScore>? BzPopMin(this IDatabase db, RedisKey[] keys, double timeout)
{
var command = CoreCommandBuilder.BzPopMin(keys, timeout);
return db.Execute(command).ToSortedSetPopResult();
}

/// <summary>
/// Syntactic sugar for <see cref="BzPopMin(StackExchange.Redis.IDatabase,StackExchange.Redis.RedisKey[],double)"/>,
/// where only one key is used.
/// </summary>
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
/// <param name="key">The key to check.</param>
/// <param name="timeout">Server-side timeout for the wait. A value of <c>0</c> means to wait indefinitely.</param>
/// <returns>A sorted set entry paired with its score, together with the key it was popped from, or <c>null</c>
/// if the server timeout expires.</returns>
/// <remarks><seealso href="https://redis.io/commands/bzpopmin"/></remarks>
public static Tuple<RedisKey, RedisValueWithScore>? BzPopMin(this IDatabase db, RedisKey key, double timeout)
{
return BzPopMin(db, new[] { key }, timeout);
}


/// <summary>
/// The BZPOPMAX command.
/// <p/>
/// Removes and returns the entry with the highest score from the first non-empty sorted set in
/// <paramref name="keys"/>. If none of the sets contain elements, the call blocks on the server until elements
/// become available, or the given <paramref name="timeout"/> expires. A <paramref name="timeout"/> of <c>0</c>
/// means to wait indefinitely server-side. Returns <c>null</c> if the server timeout expires.
/// <p/>
/// When using this, pay attention to the timeout configured in the client, on the
/// <see cref="ConnectionMultiplexer"/>, which by default can be too small:
/// <code>
/// ConfigurationOptions configurationOptions = new ConfigurationOptions();
/// configurationOptions.SyncTimeout = 120000; // set a meaningful value here
/// configurationOptions.EndPoints.Add("localhost");
/// ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(configurationOptions);
/// </code>
/// If the connection multiplexer timeout expires in the client, a <c>StackExchange.Redis.RedisTimeoutException</c>
/// is thrown.
/// <p/>
/// This is an extension method added to the <see cref="IDatabase"/> class, for convenience.
/// </summary>
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
/// <param name="keys">The keys to check.</param>
/// <param name="timeout">Server-side timeout for the wait. A value of <c>0</c> means to wait indefinitely.</param>
/// <returns>A sorted set entry paired with its score, together with the key it was popped from, or <c>null</c>
/// if the server timeout expires.</returns>
/// <remarks><seealso href="https://redis.io/commands/bzpopmax"/></remarks>
public static Tuple<RedisKey, RedisValueWithScore>? BzPopMax(this IDatabase db, RedisKey[] keys, double timeout)
{
var command = CoreCommandBuilder.BzPopMax(keys, timeout);
return db.Execute(command).ToSortedSetPopResult();
}

/// <summary>
/// Syntactic sugar for <see cref="BzPopMax(StackExchange.Redis.IDatabase,StackExchange.Redis.RedisKey[],double)"/>,
/// where only one key is used.
/// </summary>
/// <param name="db">The <see cref="IDatabase"/> class where this extension method is applied.</param>
/// <param name="key">The key to check.</param>
/// <param name="timeout">Server-side timeout for the wait. A value of <c>0</c> means to wait indefinitely.</param>
/// <returns>A sorted set entry paired with its score, together with the key it was popped from, or <c>null</c>
/// if the server timeout expires.</returns>
/// <remarks><seealso href="https://redis.io/commands/bzpopmax"/></remarks>
public static Tuple<RedisKey, RedisValueWithScore>? BzPopMax(this IDatabase db, RedisKey key, double timeout)
{
return BzPopMax(db, new[] { key }, timeout);
}
}
}
35 changes: 35 additions & 0 deletions src/NRedisStack/CoreCommands/DataTypes/MinMaxModifier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using StackExchange.Redis;

namespace NRedisStack.Core.DataTypes;

/// <summary>
/// Modifier that can be used for sorted set commands, where a MIN/MAX argument is expected by the Redis server.
/// </summary>
public enum MinMaxModifier
{
/// <summary>
/// Maps to the <c>MIN</c> argument on the Redis server.
/// </summary>
Min,

/// <summary>
/// Maps to the <c>MAX</c> argument on the Redis server.
/// </summary>
Max
}

/// <summary>
/// Conversion methods from/to other common data types.
/// </summary>
public static class MinMaxModifierExtensions
{
/// <summary>
/// Convert from <see cref="Order"/> to <see cref="MinMaxModifier"/>.
/// </summary>
public static MinMaxModifier ToMinMax(this Order order) => order switch
{
Order.Ascending => MinMaxModifier.Min,
Order.Descending => MinMaxModifier.Max,
_ => throw new ArgumentOutOfRangeException(nameof(order))
};
}
31 changes: 31 additions & 0 deletions src/NRedisStack/CoreCommands/DataTypes/RedisValueWithScore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using StackExchange.Redis;

namespace NRedisStack.Core.DataTypes;

/// <summary>
/// Holds a <see cref="RedisValue"/> with an associated score.
/// Used when working with sorted sets.
/// </summary>
public struct RedisValueWithScore
{
/// <summary>
/// Pair a <see cref="RedisValue"/> with a numeric score.
/// </summary>
public RedisValueWithScore(RedisValue value, double score)
{
Value = value;
Score = score;
}

/// <summary>
/// The value of an item stored in a sorted set. For example, in the Redis command
/// <c>ZADD my-set 5.1 my-value</c>, the value is <c>my-value</c>.
/// </summary>
public RedisValue Value { get; }

/// <summary>
/// The score of an item stored in a sorted set. For example, in the Redis command
/// <c>ZADD my-set 5.1 my-value</c>, the score is <c>5.1</c>.
/// </summary>
public double Score { get; }
}
5 changes: 4 additions & 1 deletion src/NRedisStack/CoreCommands/Literals/CommandArgs.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
namespace NRedisStack.Core.Literals
{
internal class CoreArgs
internal static class CoreArgs
{
public const string COUNT = "COUNT";
public const string lib_name = "LIB-NAME";
public const string lib_ver = "LIB-VER";
public const string MAX = "MAX";
public const string MIN = "MIN";
}
}
5 changes: 4 additions & 1 deletion src/NRedisStack/CoreCommands/Literals/Commands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ namespace NRedisStack.Core.Literals
/// <summary>
/// Redis Core command literals
/// </summary>
internal class RedisCoreCommands
internal static class RedisCoreCommands
{
public const string BZMPOP = "BZMPOP";
public const string BZPOPMAX = "BZPOPMAX";
public const string BZPOPMIN = "BZPOPMIN";
public const string CLIENT = "CLIENT";
public const string SETINFO = "SETINFO";
}
Expand Down
Loading

0 comments on commit 410c474

Please # to comment.