Skip to content

Commit a79f41a

Browse files
authored
KV empty keys fix (#399)
1 parent c9be1cf commit a79f41a

File tree

3 files changed

+51
-4
lines changed

3 files changed

+51
-4
lines changed

src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs

+13-4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ internal class NatsKVWatcher<T> : IAsyncDisposable
5050
private ulong _sequenceConsumer;
5151
private string _consumer;
5252
private volatile NatsKVWatchSub<T>? _sub;
53+
private INatsJSConsumer? _initialConsumer;
5354

5455
public NatsKVWatcher(
5556
NatsJSContext context,
@@ -114,6 +115,12 @@ public NatsKVWatcher(
114115

115116
public ChannelReader<NatsKVEntry<T>> Entries => _entryChannel.Reader;
116117

118+
internal INatsJSConsumer InitialConsumer
119+
{
120+
get => _initialConsumer ?? throw new InvalidOperationException("Consumer not initialized");
121+
private set => _initialConsumer = value;
122+
}
123+
117124
internal string Consumer
118125
{
119126
get => Volatile.Read(ref _consumer);
@@ -136,10 +143,10 @@ public async ValueTask DisposeAsync()
136143
await _commandTask;
137144
}
138145

139-
internal ValueTask InitAsync()
146+
internal async ValueTask InitAsync()
140147
{
141148
Consumer = NewNuid();
142-
return CreatePushConsumer("init");
149+
InitialConsumer = await CreatePushConsumer("init");
143150
}
144151

145152
private ValueTask OnDisconnected(object? sender, NatsEventArgs args)
@@ -308,7 +315,7 @@ private async Task ConsumerCreateLoop()
308315
}
309316
}
310317

311-
private async ValueTask CreatePushConsumer(string origin)
318+
private async ValueTask<INatsJSConsumer> CreatePushConsumer(string origin)
312319
{
313320
if (_debug)
314321
{
@@ -375,7 +382,7 @@ private async ValueTask CreatePushConsumer(string origin)
375382
config.OptStartSeq = sequence + 1;
376383
}
377384

378-
await _context.CreateOrUpdateConsumerAsync(
385+
var consumer = await _context.CreateOrUpdateConsumerAsync(
379386
_stream,
380387
config,
381388
cancellationToken: _cancellationToken);
@@ -384,6 +391,8 @@ await _context.CreateOrUpdateConsumerAsync(
384391
{
385392
_logger.LogDebug(NatsKVLogEvents.NewConsumerCreated, "Created new consumer {Consumer} from {Origin}", Consumer, origin);
386393
}
394+
395+
return consumer;
387396
}
388397

389398
private string NewNuid()

src/NATS.Client.KeyValueStore/NatsKVStore.cs

+3
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,9 @@ public async IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = defau
430430
// Type doesn't matter here, we're just using the watcher to get the keys
431431
await using var watcher = await WatchInternalAsync<int>(">", serializer: default, opts, cancellationToken);
432432

433+
if (watcher.InitialConsumer.Info.NumPending == 0)
434+
yield break;
435+
433436
while (await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
434437
{
435438
while (watcher.Entries.TryRead(out var entry))

tests/NATS.Client.KeyValueStore.Tests/GetKeysTest.cs

+35
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,39 @@ public async Task Get_keys_should_not_hang_when_there_are_deleted_keys()
4545

4646
Assert.Equal(new List<string> { "k1", "k3" }, ks2);
4747
}
48+
49+
[Fact]
50+
public async Task Get_keys_when_empty()
51+
{
52+
const string bucket = "b1";
53+
var config = new NatsKVConfig(bucket) { History = 10 };
54+
55+
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
56+
var cancellationToken = cts.Token;
57+
58+
await using var server = NatsServer.StartJS();
59+
await using var nats1 = server.CreateClientConnection();
60+
var js1 = new NatsJSContext(nats1);
61+
var kv1 = new NatsKVContext(js1);
62+
var store1 = await kv1.CreateStoreAsync(config, cancellationToken: cancellationToken);
63+
64+
var count = 0;
65+
await foreach (var k in store1.GetKeysAsync(cancellationToken: cancellationToken))
66+
{
67+
count++;
68+
}
69+
70+
Assert.Equal(0, count);
71+
72+
await store1.PutAsync("k1", 1, cancellationToken: cancellationToken);
73+
await store1.PutAsync("k2", 2, cancellationToken: cancellationToken);
74+
await store1.PutAsync("k3", 3, cancellationToken: cancellationToken);
75+
76+
await foreach (var k in store1.GetKeysAsync(cancellationToken: cancellationToken))
77+
{
78+
count++;
79+
}
80+
81+
Assert.Equal(3, count);
82+
}
4883
}

0 commit comments

Comments
 (0)