Skip to content

Commit da0ebcb

Browse files
authored
Make sure queue group is written to wire (#414)
* Make sure queue group is written to wire * Test fix
1 parent c56cbdd commit da0ebcb

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

src/NATS.Client.Core/Commands/ProtocolWriter.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void WriteSubscribe(IBufferWriter<byte> writer, int sid, string subject,
114114

115115
if (queueGroup != null)
116116
{
117-
written = _subjectEncoding.GetBytes(subject, span);
117+
written = _subjectEncoding.GetBytes(queueGroup, span);
118118
span[written] = (byte)' ';
119119
size += written + 1;
120120
span = span[(written + 1)..];

tests/NATS.Client.Core.Tests/ProtocolTest.cs

+26
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,32 @@ await Retry.Until(
103103
proxy.Dispose();
104104
}
105105

106+
[Fact]
107+
public async Task Subscription_queue_group()
108+
{
109+
await using var server = NatsServer.Start();
110+
var (nats, proxy) = server.CreateProxiedClientConnection();
111+
112+
await using var sub1 = await nats.SubscribeCoreAsync<int>("foo", queueGroup: "group1");
113+
await using var sub2 = await nats.SubscribeCoreAsync<int>("foo", queueGroup: "group2");
114+
115+
await Retry.Until(
116+
"frames collected",
117+
() => proxy.ClientFrames.Count(f => f.Message.StartsWith("SUB foo")) == 2);
118+
119+
var frames = proxy.ClientFrames.Select(f => f.Message).ToList();
120+
121+
foreach (var frame in frames)
122+
{
123+
_output.WriteLine($"frame: {frame}");
124+
}
125+
126+
Assert.StartsWith("SUB foo group1 ", frames[0]);
127+
Assert.StartsWith("SUB foo group2 ", frames[1]);
128+
129+
await nats.DisposeAsync();
130+
}
131+
106132
[Fact]
107133
public async Task Publish_empty_message_for_notifications()
108134
{

0 commit comments

Comments
 (0)