Skip to content

Commit 077e342

Browse files
authored
Add support for Direct Get API with key in subject (#771)
* Add support for Direct Get API with key in subject * Fix format
1 parent 18fe3a3 commit 077e342

File tree

3 files changed

+109
-8
lines changed

3 files changed

+109
-8
lines changed

src/NATS.Client.KeyValueStore/NatsKVContext.cs

+28-6
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,30 @@ public enum NatsKVStorageType
1919
/// </summary>
2020
public class NatsKVContext : INatsKVContext
2121
{
22-
private const string KvStreamNamePrefix = "KV_";
22+
internal const string KvStreamNamePrefix = "KV_";
2323
private static readonly int KvStreamNamePrefixLen = KvStreamNamePrefix.Length;
2424
private static readonly Regex ValidBucketRegex = new(pattern: @"\A[a-zA-Z0-9_-]+\z", RegexOptions.Compiled);
25+
private readonly NatsKVOpts _opts;
2526

2627
/// <summary>
2728
/// Create a new Key Value Store context
2829
/// </summary>
2930
/// <param name="context">JetStream context</param>
30-
public NatsKVContext(INatsJSContext context) => JetStreamContext = context;
31+
/// <param name="opts">Context options</param>
32+
public NatsKVContext(INatsJSContext context, NatsKVOpts opts)
33+
{
34+
JetStreamContext = context;
35+
_opts = opts;
36+
}
37+
38+
/// <summary>
39+
/// Create a new Key Value Store context
40+
/// </summary>
41+
/// <param name="context">JetStream context</param>
42+
public NatsKVContext(INatsJSContext context)
43+
: this(context, NatsKVOpts.Default)
44+
{
45+
}
3146

3247
/// <inheritdoc />
3348
public INatsJSContext JetStreamContext { get; }
@@ -45,7 +60,7 @@ public async ValueTask<INatsKVStore> CreateStoreAsync(NatsKVConfig config, Cance
4560

4661
var stream = await JetStreamContext.CreateStreamAsync(streamConfig, cancellationToken);
4762

48-
return new NatsKVStore(config.Bucket, JetStreamContext, stream);
63+
return new NatsKVStore(config.Bucket, JetStreamContext, stream, _opts);
4964
}
5065

5166
/// <inheritdoc />
@@ -61,7 +76,7 @@ public async ValueTask<INatsKVStore> GetStoreAsync(string bucket, CancellationTo
6176
}
6277

6378
// TODO: KV mirror
64-
return new NatsKVStore(bucket, JetStreamContext, stream);
79+
return new NatsKVStore(bucket, JetStreamContext, stream, _opts);
6580
}
6681

6782
/// <inheritdoc />
@@ -73,7 +88,7 @@ public async ValueTask<INatsKVStore> UpdateStoreAsync(NatsKVConfig config, Cance
7388

7489
var stream = await JetStreamContext.UpdateStreamAsync(streamConfig, cancellationToken);
7590

76-
return new NatsKVStore(config.Bucket, JetStreamContext, stream);
91+
return new NatsKVStore(config.Bucket, JetStreamContext, stream, _opts);
7792
}
7893

7994
/// <inheritdoc />
@@ -85,7 +100,7 @@ public async ValueTask<INatsKVStore> CreateOrUpdateStoreAsync(NatsKVConfig confi
85100

86101
var stream = await JetStreamContext.CreateOrUpdateStreamAsync(streamConfig, cancellationToken);
87102

88-
return new NatsKVStore(config.Bucket, JetStreamContext, stream);
103+
return new NatsKVStore(config.Bucket, JetStreamContext, stream, _opts);
89104
}
90105

91106
/// <inheritdoc />
@@ -252,3 +267,10 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config)
252267
return streamConfig;
253268
}
254269
}
270+
271+
public class NatsKVOpts
272+
{
273+
public static readonly NatsKVOpts Default = new();
274+
275+
public bool UseDirectGetApiWithKeysInSubject { get; init; }
276+
}

src/NATS.Client.KeyValueStore/NatsKVStore.cs

+18-2
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,18 @@ public class NatsKVStore : INatsKVStore
5656
private static readonly NatsKVException KeyCannotStartOrEndWithPeriodException = new("Key cannot start or end with a period");
5757
private static readonly NatsKVException KeyContainsInvalidCharactersException = new("Key contains invalid characters");
5858
private readonly INatsJSStream _stream;
59+
private readonly NatsKVOpts _opts;
5960
private readonly string _kvBucket;
61+
private readonly string _streamName;
6062

61-
internal NatsKVStore(string bucket, INatsJSContext context, INatsJSStream stream)
63+
internal NatsKVStore(string bucket, INatsJSContext context, INatsJSStream stream, NatsKVOpts opts)
6264
{
6365
Bucket = bucket;
6466
JetStreamContext = context;
6567
_stream = stream;
68+
_opts = opts;
6669
_kvBucket = $"$KV.{Bucket}.";
70+
_streamName = NatsKVContext.KvStreamNamePrefix + Bucket;
6771
}
6872

6973
/// <inheritdoc />
@@ -333,7 +337,19 @@ public async ValueTask<NatsResult<NatsKVEntry<T>>> TryGetEntryAsync<T>(string ke
333337

334338
if (_stream.Info.Config.AllowDirect)
335339
{
336-
var direct = await _stream.GetDirectAsync<T>(request, serializer, cancellationToken);
340+
NatsMsg<T> direct;
341+
if (_opts.UseDirectGetApiWithKeysInSubject)
342+
{
343+
direct = await JetStreamContext.Connection.RequestAsync<object, T>(
344+
subject: $"{JetStreamContext.Opts.Prefix}.DIRECT.GET.{_streamName}.{keySubject}",
345+
data: null,
346+
replySerializer: serializer,
347+
cancellationToken: cancellationToken);
348+
}
349+
else
350+
{
351+
direct = await _stream.GetDirectAsync<T>(request, serializer, cancellationToken);
352+
}
337353

338354
if (direct is { Headers: { } headers } msg)
339355
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
using NATS.Client.Core.Tests;
2+
3+
namespace NATS.Client.KeyValueStore.Tests;
4+
5+
public class DirectGetTest(ITestOutputHelper output)
6+
{
7+
[Fact]
8+
public async Task API_subject_test()
9+
{
10+
await using var server = await NatsServer.StartJSAsync();
11+
var (nats1, proxy) = server.CreateProxiedClientConnection();
12+
await using var nats = nats1;
13+
14+
var js = new NatsJSContext(nats);
15+
16+
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
17+
var cancellationToken = cts.Token;
18+
19+
// default
20+
{
21+
var kv = new NatsKVContext(js);
22+
23+
var store = await kv.CreateStoreAsync(new NatsKVConfig("b1"), cancellationToken);
24+
await store.PutAsync("x", 1, cancellationToken: cancellationToken);
25+
await store.PutAsync("x", 2, cancellationToken: cancellationToken);
26+
27+
await proxy.FlushFramesAsync(nats);
28+
29+
var entry = await store.GetEntryAsync<int>("x", cancellationToken: cancellationToken);
30+
Assert.Equal(2, entry.Value);
31+
32+
var proto = proxy.ClientFrames[0].Message;
33+
Assert.StartsWith("PUB $JS.API.DIRECT.GET.KV_b1 _INBOX.", proto);
34+
Assert.EndsWith("""␍␊{"last_by_subj":"$KV.b1.x"}""", proto);
35+
foreach (var proxyFrame in proxy.ClientFrames)
36+
{
37+
output.WriteLine(proxyFrame.Message);
38+
}
39+
}
40+
41+
// key in api subject
42+
{
43+
var kv = new NatsKVContext(js, new NatsKVOpts { UseDirectGetApiWithKeysInSubject = true });
44+
45+
var store = await kv.CreateStoreAsync(new NatsKVConfig("b1"), cancellationToken);
46+
await store.PutAsync("x", 1, cancellationToken: cancellationToken);
47+
await store.PutAsync("x", 2, cancellationToken: cancellationToken);
48+
49+
await proxy.FlushFramesAsync(nats);
50+
51+
var entry = await store.GetEntryAsync<int>("x", cancellationToken: cancellationToken);
52+
Assert.Equal(2, entry.Value);
53+
54+
var proto = proxy.ClientFrames[0].Message;
55+
Assert.StartsWith("PUB $JS.API.DIRECT.GET.KV_b1.$KV.b1.x _INBOX.", proto);
56+
Assert.EndsWith(""" 0␍␊""", proto);
57+
foreach (var proxyFrame in proxy.ClientFrames)
58+
{
59+
output.WriteLine(proxyFrame.Message);
60+
}
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)