Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Support v2 JS.ACK format #351

Merged
merged 8 commits into from
Sep 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 48 additions & 19 deletions nats/aio/msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,47 @@ def metadata(self) -> "Metadata":
if metadata is not None:
return metadata

# TODO: Support V2TokenCount style with domains.
tokens = Msg.Metadata._get_metadata_fields(msg.reply)
t = datetime.datetime.fromtimestamp(int(tokens[7]) / 1_000_000_000.0)
metadata = Msg.Metadata(
sequence=Msg.Metadata.SequencePair(
stream=int(tokens[5]),
consumer=int(tokens[6]),
),
num_delivered=int(tokens[4]),
num_pending=int(tokens[8]),
timestamp=t,
stream=tokens[2],
consumer=tokens[3],
)

if len(tokens) == Msg.Ack.V1TokenCount:
t = datetime.datetime.fromtimestamp(
int(tokens[7]) / 1_000_000_000.0
)
metadata = Msg.Metadata(
sequence=Msg.Metadata.SequencePair(
stream=int(tokens[5]),
consumer=int(tokens[6]),
),
num_delivered=int(tokens[4]),
num_pending=int(tokens[8]),
timestamp=t,
stream=tokens[2],
consumer=tokens[3],
)
else:
t = datetime.datetime.fromtimestamp(
int(tokens[Msg.Ack.Timestamp]) / 1_000_000_000.0
)

# Underscore indicate no domain is set. Expose as empty string
# to client.
domain = tokens[Msg.Ack.Domain]
if domain == "_":
domain = ""

metadata = Msg.Metadata(
sequence=Msg.Metadata.SequencePair(
stream=int(tokens[Msg.Ack.StreamSeq]),
consumer=int(tokens[Msg.Ack.ConsumerSeq]),
),
num_delivered=int(tokens[Msg.Ack.NumDelivered]),
num_pending=int(tokens[Msg.Ack.NumPending]),
timestamp=t,
stream=tokens[Msg.Ack.Stream],
consumer=tokens[Msg.Ack.Consumer],
domain=domain,
)

msg._metadata = metadata
return metadata

Expand Down Expand Up @@ -193,6 +220,7 @@ class Metadata:
timestamp: Optional[datetime.datetime] = None
stream: Optional[str] = None
consumer: Optional[str] = None
domain: Optional[str] = None

@dataclass
class SequencePair:
Expand All @@ -204,11 +232,12 @@ class SequencePair:

@classmethod
def _get_metadata_fields(cls, reply: Optional[str]) -> List[str]:
if reply is None or reply == '':
if not reply:
raise NotJSMessageError
tokens = reply.split('.')
if len(tokens) != Msg.Ack.V1TokenCount or \
tokens[0] != Msg.Ack.Prefix0 or \
tokens[1] != Msg.Ack.Prefix1:
raise NotJSMessageError
return tokens
if (len(tokens) == Msg.Ack.V1TokenCount or
len(tokens) >= Msg.Ack.V2TokenCount-1) and \
tokens[0] == Msg.Ack.Prefix0 and \
tokens[1] == Msg.Ack.Prefix1:
return tokens
raise NotJSMessageError
4 changes: 3 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,14 +336,16 @@ async def subscription_handler(msg):

async def subscription_handler2(msg):
msgs2.append(msg)
if len(msgs2) >= 10:
if len(msgs2) >= 1:
fut.set_result(True)

await nc.connect(no_echo=True)
await nc2.connect(no_echo=False)

sub = await nc.subscribe("foo", cb=subscription_handler)
sub2 = await nc2.subscribe("foo", cb=subscription_handler2)
await nc.flush()
await nc2.flush()

payload = b'hello world'
for i in range(0, 10):
Expand Down
61 changes: 61 additions & 0 deletions tests/test_js.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest
import nats
import nats.js.api
from nats.aio.msg import Msg
from nats.aio.client import Client as NATS, __version__
from nats.aio.errors import *
from nats.errors import *
Expand Down Expand Up @@ -1246,6 +1247,66 @@ async def cb3(msg):

class AckPolicyTest(SingleJetStreamServerTestCase):

@async_test
async def test_ack_v2_tokens(self):
nc = await nats.connect()

# At least 11 tokens case
msg = Msg(nc)
domain = "foo_domain"
account_hash = "bar_account"
stream_name = "stream"
consumer_name = "consumer"
num_delivered = 1
stream_sequence = 2
consumer_sequence = 2
timestamp = 1662856107340506000
num_pending = 20
msg.reply = f"$JS.ACK.{domain}.{account_hash}.{stream_name}.{consumer_name}.{num_delivered}.{stream_sequence}.{consumer_sequence}.{timestamp}.{num_pending}"
meta = msg.metadata
assert meta.domain == domain
assert meta.stream == stream_name
assert meta.consumer == consumer_name
assert meta.sequence.stream == stream_sequence
assert meta.sequence.consumer == consumer_sequence
assert meta.num_delivered == num_delivered
assert meta.num_pending == num_pending
print(meta.timestamp)
assert meta.timestamp.astimezone(datetime.timezone.utc
) == datetime.datetime(
2022,
9,
11,
0,
28,
27,
340506,
tzinfo=datetime.timezone.utc
)

# Complete v2 tokens (last one discarded)
msg = Msg(nc)
msg.reply = f"$JS.ACK.{domain}.{account_hash}.{stream_name}.{consumer_name}.{num_delivered}.{stream_sequence}.{consumer_sequence}.{timestamp}.{num_pending}.123456"
meta = msg.metadata
assert meta.domain == domain
assert meta.stream == stream_name
assert meta.consumer == consumer_name
assert meta.sequence.stream == stream_sequence
assert meta.sequence.consumer == consumer_sequence
assert meta.num_delivered == num_delivered
assert meta.num_pending == num_pending
assert meta.timestamp.astimezone(datetime.timezone.utc
) == datetime.datetime(
2022,
9,
11,
0,
28,
27,
340506,
tzinfo=datetime.timezone.utc
)

@async_test
async def test_double_acking_pull_subscribe(self):
nc = await nats.connect()
Expand Down