From 6c5942e32aa5b8d4ef33b63dacc135cadcfaa4c2 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Sat, 10 Sep 2022 08:14:47 -0400 Subject: [PATCH 1/8] Support v2 JS.ACK format See nats-io/nats-and-architecture-design#49 Signed-off-by: Byron Ruth --- nats/aio/msg.py | 44 +++++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/nats/aio/msg.py b/nats/aio/msg.py index ab319c7a..a8e6c3ad 100644 --- a/nats/aio/msg.py +++ b/nats/aio/msg.py @@ -147,20 +147,36 @@ def metadata(self) -> "Metadata": if metadata is not None: return metadata - # TODO: Support V2TokenCount style with domains. tokens = Msg.Metadata._get_metadata_fields(msg.reply) - t = datetime.datetime.fromtimestamp(int(tokens[7]) / 1_000_000_000.0) - metadata = Msg.Metadata( - sequence=Msg.Metadata.SequencePair( - stream=int(tokens[5]), - consumer=int(tokens[6]), - ), - num_delivered=int(tokens[4]), - num_pending=int(tokens[8]), - timestamp=t, - stream=tokens[2], - consumer=tokens[3], - ) + + if len(toks) == V1TokenCount: + t = datetime.datetime.fromtimestamp(int(tokens[7]) / 1_000_000_000.0) + metadata = Msg.Metadata( + sequence=Msg.Metadata.SequencePair( + stream=int(tokens[5]), + consumer=int(tokens[6]), + ), + num_delivered=int(tokens[4]), + num_pending=int(tokens[8]), + timestamp=t, + stream=tokens[2], + consumer=tokens[3], + ) + else: + t = datetime.datetime.fromtimestamp(int(tokens[Msg.Ack.Timestamp]) / 1_000_000_000.0) + metadata = Msg.Metadata( + sequence=Msg.Metadata.SequencePair( + stream=int(tokens[Msg.Ack.StreamSeq]), + consumer=int(tokens[Msg.Ack.ConsumerSeq]), + ), + num_delivered=int(tokens[Msg.Ack.NumDelivered]), + num_pending=int(tokens[Msg.Ack.NumPending]), + timestamp=t, + stream=tokens[Msg.Ack.Stream], + consumer=tokens[Msg.Ack.Consumer], + domain=tokens[Msg.Ack.Domain], + ) + msg._metadata = metadata return metadata @@ -193,6 +209,7 @@ class Metadata: timestamp: Optional[datetime.datetime] = None stream: Optional[str] = None consumer: Optional[str] = None + domain: Optional[str] = None @dataclass class SequencePair: @@ -208,6 +225,7 @@ def _get_metadata_fields(cls, reply: Optional[str]) -> List[str]: raise NotJSMessageError tokens = reply.split('.') if len(tokens) != Msg.Ack.V1TokenCount or \ + len(tokens) != Msg.Ack.V2TokenCount or \ tokens[0] != Msg.Ack.Prefix0 or \ tokens[1] != Msg.Ack.Prefix1: raise NotJSMessageError From 167418bab2d4948e796a550f339730656a6d0917 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Sat, 10 Sep 2022 08:20:00 -0400 Subject: [PATCH 2/8] Fix lint Signed-off-by: Byron Ruth --- nats/aio/msg.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/nats/aio/msg.py b/nats/aio/msg.py index a8e6c3ad..bf46618c 100644 --- a/nats/aio/msg.py +++ b/nats/aio/msg.py @@ -150,7 +150,9 @@ def metadata(self) -> "Metadata": tokens = Msg.Metadata._get_metadata_fields(msg.reply) if len(toks) == V1TokenCount: - t = datetime.datetime.fromtimestamp(int(tokens[7]) / 1_000_000_000.0) + t = datetime.datetime.fromtimestamp( + int(tokens[7]) / 1_000_000_000.0 + ) metadata = Msg.Metadata( sequence=Msg.Metadata.SequencePair( stream=int(tokens[5]), @@ -163,7 +165,9 @@ def metadata(self) -> "Metadata": consumer=tokens[3], ) else: - t = datetime.datetime.fromtimestamp(int(tokens[Msg.Ack.Timestamp]) / 1_000_000_000.0) + t = datetime.datetime.fromtimestamp( + int(tokens[Msg.Ack.Timestamp]) / 1_000_000_000.0 + ) metadata = Msg.Metadata( sequence=Msg.Metadata.SequencePair( stream=int(tokens[Msg.Ack.StreamSeq]), From e906809c9cced0e5d3a11010baa56dd0efb510dd Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Sat, 10 Sep 2022 08:22:55 -0400 Subject: [PATCH 3/8] Represent "no domain" as empty string Signed-off-by: Byron Ruth --- nats/aio/msg.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/nats/aio/msg.py b/nats/aio/msg.py index bf46618c..2f84a211 100644 --- a/nats/aio/msg.py +++ b/nats/aio/msg.py @@ -168,6 +168,13 @@ def metadata(self) -> "Metadata": t = datetime.datetime.fromtimestamp( int(tokens[Msg.Ack.Timestamp]) / 1_000_000_000.0 ) + + # Underscore indicate no domain is set. Expose as empty string + # to client. + domain = tokens[Msg.Ack.Domain] + if domain == "_": + domain = "" + metadata = Msg.Metadata( sequence=Msg.Metadata.SequencePair( stream=int(tokens[Msg.Ack.StreamSeq]), @@ -178,7 +185,7 @@ def metadata(self) -> "Metadata": timestamp=t, stream=tokens[Msg.Ack.Stream], consumer=tokens[Msg.Ack.Consumer], - domain=tokens[Msg.Ack.Domain], + domain=domain, ) msg._metadata = metadata From bd9cfa235c1f7847c970409bafebb33ef811a8ad Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Sat, 10 Sep 2022 08:35:15 -0400 Subject: [PATCH 4/8] Fix logic Signed-off-by: Byron Ruth --- nats/aio/msg.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/nats/aio/msg.py b/nats/aio/msg.py index 2f84a211..220192ef 100644 --- a/nats/aio/msg.py +++ b/nats/aio/msg.py @@ -232,12 +232,12 @@ class SequencePair: @classmethod def _get_metadata_fields(cls, reply: Optional[str]) -> List[str]: - if reply is None or reply == '': + if not reply: raise NotJSMessageError tokens = reply.split('.') - if len(tokens) != Msg.Ack.V1TokenCount or \ - len(tokens) != Msg.Ack.V2TokenCount or \ - tokens[0] != Msg.Ack.Prefix0 or \ - tokens[1] != Msg.Ack.Prefix1: - raise NotJSMessageError - return tokens + if (len(tokens) == Msg.Ack.V1TokenCount or + len(tokens) == Msg.Ack.V2TokenCount) and \ + tokens[0] == Msg.Ack.Prefix0 and \ + tokens[1] == Msg.Ack.Prefix1: + return tokens + raise NotJSMessageError From a38d0f3678799aa560907be5f2260566e2014532 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Sat, 10 Sep 2022 08:40:23 -0400 Subject: [PATCH 5/8] Fix wrong var name Signed-off-by: Byron Ruth --- nats/aio/msg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nats/aio/msg.py b/nats/aio/msg.py index 220192ef..f0149f67 100644 --- a/nats/aio/msg.py +++ b/nats/aio/msg.py @@ -149,7 +149,7 @@ def metadata(self) -> "Metadata": tokens = Msg.Metadata._get_metadata_fields(msg.reply) - if len(toks) == V1TokenCount: + if len(tokens) == V1TokenCount: t = datetime.datetime.fromtimestamp( int(tokens[7]) / 1_000_000_000.0 ) From e5d317c18595f37a9b43c570c5f8678093bf2eda Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Sat, 10 Sep 2022 08:46:52 -0400 Subject: [PATCH 6/8] Fix bad reference Signed-off-by: Byron Ruth --- nats/aio/msg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nats/aio/msg.py b/nats/aio/msg.py index f0149f67..a049f130 100644 --- a/nats/aio/msg.py +++ b/nats/aio/msg.py @@ -149,7 +149,7 @@ def metadata(self) -> "Metadata": tokens = Msg.Metadata._get_metadata_fields(msg.reply) - if len(tokens) == V1TokenCount: + if len(tokens) == Msg.Ack.V1TokenCount: t = datetime.datetime.fromtimestamp( int(tokens[7]) / 1_000_000_000.0 ) From 2507ba68666da4ef900aab415f44a94d3aab1b45 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Sat, 10 Sep 2022 21:55:49 -0700 Subject: [PATCH 7/8] Added test, tweaked token count Signed-off-by: Waldemar Quevedo --- nats/aio/msg.py | 2 +- tests/test_js.py | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/nats/aio/msg.py b/nats/aio/msg.py index a049f130..46b75184 100644 --- a/nats/aio/msg.py +++ b/nats/aio/msg.py @@ -236,7 +236,7 @@ def _get_metadata_fields(cls, reply: Optional[str]) -> List[str]: raise NotJSMessageError tokens = reply.split('.') if (len(tokens) == Msg.Ack.V1TokenCount or - len(tokens) == Msg.Ack.V2TokenCount) and \ + len(tokens) >= Msg.Ack.V2TokenCount-1) and \ tokens[0] == Msg.Ack.Prefix0 and \ tokens[1] == Msg.Ack.Prefix1: return tokens diff --git a/tests/test_js.py b/tests/test_js.py index c4d819d8..40484d94 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -8,6 +8,7 @@ import pytest import nats import nats.js.api +from nats.aio.msg import Msg from nats.aio.client import Client as NATS, __version__ from nats.aio.errors import * from nats.errors import * @@ -1246,6 +1247,45 @@ async def cb3(msg): class AckPolicyTest(SingleJetStreamServerTestCase): + @async_test + async def test_ack_v2_tokens(self): + nc = await nats.connect() + + # At least 11 tokens case + msg = Msg(nc) + domain = "foo_domain" + account_hash = "bar_account" + stream_name = "stream" + consumer_name = "consumer" + num_delivered = 1 + stream_sequence = 2 + consumer_sequence = 2 + timestamp = 1662856107340506000 + num_pending = 20 + msg.reply = f"$JS.ACK.{domain}.{account_hash}.{stream_name}.{consumer_name}.{num_delivered}.{stream_sequence}.{consumer_sequence}.{timestamp}.{num_pending}" + meta = msg.metadata + assert meta.domain == domain + assert meta.stream == stream_name + assert meta.consumer == consumer_name + assert meta.sequence.stream == stream_sequence + assert meta.sequence.consumer == consumer_sequence + assert meta.num_delivered == num_delivered + assert meta.num_pending == num_pending + assert meta.timestamp == datetime.datetime(2022, 9, 10, 17, 28, 27, 340506) + + # Complete v2 tokens (last one discarded) + msg = Msg(nc) + msg.reply = f"$JS.ACK.{domain}.{account_hash}.{stream_name}.{consumer_name}.{num_delivered}.{stream_sequence}.{consumer_sequence}.{timestamp}.{num_pending}.123456" + meta = msg.metadata + assert meta.domain == domain + assert meta.stream == stream_name + assert meta.consumer == consumer_name + assert meta.sequence.stream == stream_sequence + assert meta.sequence.consumer == consumer_sequence + assert meta.num_delivered == num_delivered + assert meta.num_pending == num_pending + assert meta.timestamp == datetime.datetime(2022, 9, 10, 17, 28, 27, 340506) + @async_test async def test_double_acking_pull_subscribe(self): nc = await nats.connect() From 4aaaf0e8e5d695f178660f4729aa6e363e652c41 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Sat, 10 Sep 2022 22:04:23 -0700 Subject: [PATCH 8/8] Add flush to no echo test Signed-off-by: Waldemar Quevedo --- tests/test_client.py | 4 +++- tests/test_js.py | 25 +++++++++++++++++++++++-- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/tests/test_client.py b/tests/test_client.py index 0396b63f..e8f38f63 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -336,7 +336,7 @@ async def subscription_handler(msg): async def subscription_handler2(msg): msgs2.append(msg) - if len(msgs2) >= 10: + if len(msgs2) >= 1: fut.set_result(True) await nc.connect(no_echo=True) @@ -344,6 +344,8 @@ async def subscription_handler2(msg): sub = await nc.subscribe("foo", cb=subscription_handler) sub2 = await nc2.subscribe("foo", cb=subscription_handler2) + await nc.flush() + await nc2.flush() payload = b'hello world' for i in range(0, 10): diff --git a/tests/test_js.py b/tests/test_js.py index 40484d94..736510d8 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -1271,7 +1271,18 @@ async def test_ack_v2_tokens(self): assert meta.sequence.consumer == consumer_sequence assert meta.num_delivered == num_delivered assert meta.num_pending == num_pending - assert meta.timestamp == datetime.datetime(2022, 9, 10, 17, 28, 27, 340506) + print(meta.timestamp) + assert meta.timestamp.astimezone(datetime.timezone.utc + ) == datetime.datetime( + 2022, + 9, + 11, + 0, + 28, + 27, + 340506, + tzinfo=datetime.timezone.utc + ) # Complete v2 tokens (last one discarded) msg = Msg(nc) @@ -1284,7 +1295,17 @@ async def test_ack_v2_tokens(self): assert meta.sequence.consumer == consumer_sequence assert meta.num_delivered == num_delivered assert meta.num_pending == num_pending - assert meta.timestamp == datetime.datetime(2022, 9, 10, 17, 28, 27, 340506) + assert meta.timestamp.astimezone(datetime.timezone.utc + ) == datetime.datetime( + 2022, + 9, + 11, + 0, + 28, + 27, + 340506, + tzinfo=datetime.timezone.utc + ) @async_test async def test_double_acking_pull_subscribe(self):