Skip to content

Commit 7dd7525

Browse files
authored
Fixed an exception which happens when PutAsync is used more than once and activity logging is enabled in main project (#675)
Added some tests.
1 parent 26ffb37 commit 7dd7525

File tree

3 files changed

+70
-3
lines changed

3 files changed

+70
-3
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,4 @@ nuget/*.unitypackage
119119

120120
# MacOS folder attributes
121121
.DS_Store
122+
/tests/NATS.Client.TestUtilities/Properties/launchSettings.json

src/NATS.Client.ObjectStore/NatsObjStore.cs

+2-3
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ public class NatsObjStore : INatsObjStore
2525
private const string NatsRollup = "Nats-Rollup";
2626
private const string RollupSubject = "sub";
2727

28-
private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } };
29-
3028
private readonly NatsObjContext _objContext;
3129
private readonly INatsJSStream _stream;
3230

@@ -603,7 +601,8 @@ public async ValueTask DeleteAsync(string key, CancellationToken cancellationTok
603601

604602
private async ValueTask PublishMeta(ObjectMetadata meta, CancellationToken cancellationToken)
605603
{
606-
var ack = await JetStreamContext.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer<ObjectMetadata>.Default, headers: NatsRollupHeaders, cancellationToken: cancellationToken);
604+
var natsRollupHeaders = new NatsHeaders { { NatsRollup, RollupSubject } };
605+
var ack = await JetStreamContext.PublishAsync(GetMetaSubject(meta.Name), meta, serializer: NatsObjJsonSerializer<ObjectMetadata>.Default, headers: natsRollupHeaders, cancellationToken: cancellationToken);
607606
ack.EnsureSuccess();
608607
}
609608

tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs

+67
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Diagnostics;
12
using System.Security.Cryptography;
23
using System.Text;
34
using NATS.Client.Core.Tests;
@@ -484,4 +485,70 @@ public async Task Put_get_serialization_when_default_serializer_is_not_used()
484485
var info = await store.GetInfoAsync("k1", cancellationToken: cancellationToken);
485486
Assert.Equal("k1", info.Name);
486487
}
488+
489+
[Fact]
490+
public async Task Put_with_activity()
491+
{
492+
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
493+
var cancellationToken = cts.Token;
494+
495+
using var activitySource = new ActivitySource($"NATS-debug-{nameof(Put_with_activity)}");
496+
using var activityListener = new ActivityListener
497+
{
498+
ShouldListenTo = _ => true,
499+
SampleUsingParentId = (ref ActivityCreationOptions<string> _) => ActivitySamplingResult.AllData,
500+
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
501+
};
502+
using var activity = activitySource.StartActivity(ActivityKind.Client);
503+
ActivitySource.AddActivityListener(activityListener);
504+
505+
await using var server = NatsServer.StartJS();
506+
await using var nats = server.CreateClientConnection();
507+
var js = new NatsJSContext(nats);
508+
var obj = new NatsObjContext(js);
509+
510+
var store = await obj.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);
511+
512+
var data = new byte[1024];
513+
Random.Shared.NextBytes(data);
514+
515+
const string filename = $"_tmp_test_file_{nameof(Put_with_activity)}.bin";
516+
await File.WriteAllBytesAsync(filename, data, cancellationToken);
517+
518+
await store.PutAsync("my/random/data_1.bin", File.OpenRead(filename), cancellationToken: cancellationToken);
519+
}
520+
521+
[Fact]
522+
public async Task Put_multiple_times_with_activity()
523+
{
524+
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
525+
var cancellationToken = cts.Token;
526+
527+
using var activitySource = new ActivitySource($"NATS-debug-{nameof(Put_multiple_times_with_activity)}");
528+
using var activityListener = new ActivityListener
529+
{
530+
ShouldListenTo = _ => true,
531+
SampleUsingParentId = (ref ActivityCreationOptions<string> _) => ActivitySamplingResult.AllData,
532+
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
533+
};
534+
using var activity = activitySource.StartActivity(ActivityKind.Client);
535+
ActivitySource.AddActivityListener(activityListener);
536+
537+
await using var server = NatsServer.StartJS();
538+
await using var nats = server.CreateClientConnection();
539+
var js = new NatsJSContext(nats);
540+
var obj = new NatsObjContext(js);
541+
542+
var store = await obj.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);
543+
544+
var data = new byte[1024];
545+
Random.Shared.NextBytes(data);
546+
547+
const string filename = $"_tmp_test_file_{nameof(Put_multiple_times_with_activity)}.bin";
548+
await File.WriteAllBytesAsync(filename, data, cancellationToken);
549+
550+
await store.PutAsync("my/random/data_1.bin", File.OpenRead(filename), cancellationToken: cancellationToken);
551+
await store.PutAsync("my/random/data_2.bin", File.OpenRead(filename), cancellationToken: cancellationToken);
552+
await store.PutAsync("my/random/data_3.bin", File.OpenRead(filename), cancellationToken: cancellationToken);
553+
}
487554
}

0 commit comments

Comments
 (0)