From 55489ba40eab1e584504e6dce06af8512878e8c7 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Thu, 23 Jan 2025 06:21:32 +0100 Subject: [PATCH] Add `updates_only` mode to object store watch --- nats/js/object_store.py | 17 ++++++++++-- tests/test_js.py | 61 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/nats/js/object_store.py b/nats/js/object_store.py index 441e72d3..bc00ff26 100644 --- a/nats/js/object_store.py +++ b/nats/js/object_store.py @@ -468,9 +468,16 @@ async def watch( ignore_deletes=False, include_history=False, meta_only=False, + updates_only=False, ) -> ObjectWatcher: """ watch for changes in the underlying store and receive meta information updates. + + :param ignore_deletes: Whether to ignore deleted objects in the updates + :param include_history: Whether to include historical values + :param meta_only: Whether to only receive metadata + :param updates_only: Whether to only receive updates after the current state + :return: An ObjectWatcher instance """ all_meta = OBJ_ALL_META_PRE_TEMPLATE.format(bucket=self._name, ) watcher = ObjectStore.ObjectWatcher(self) @@ -484,7 +491,8 @@ async def watch_updates(msg): # When there are no more updates send an empty marker # to signal that it is done, this will unblock iterators - if (not watcher._init_done) and meta.num_pending == 0: + # Only send None marker when not in updates_only mode + if (not watcher._init_done) and meta.num_pending == 0 and not updates_only: watcher._init_done = True await watcher._updates.put(None) @@ -492,10 +500,13 @@ async def watch_updates(msg): await self._js.get_last_msg(self._stream, all_meta) except NotFoundError: watcher._init_done = True - await watcher._updates.put(None) + if not updates_only: + await watcher._updates.put(None) deliver_policy = None - if not include_history: + if updates_only: + deliver_policy = api.DeliverPolicy.NEW + elif not include_history: deliver_policy = api.DeliverPolicy.LAST_PER_SUBJECT watcher._sub = await self._js.subscribe( diff --git a/tests/test_js.py b/tests/test_js.py index fce4dd44..5e4c2c27 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -4494,3 +4494,64 @@ async def test_add_stream_invalid_names(self): ), ): await js.add_stream(name=name) + + @async_test + async def test_object_watch_updates_only(self): + errors = [] + + async def error_handler(e): + print("Error:", e, type(e)) + errors.append(e) + + nc = await nats.connect(error_cb=error_handler) + js = nc.jetstream() + + obs = await js.create_object_store( + "TEST_FILES", + config=nats.js.api.ObjectStoreConfig(description="updates_only_test", ), + ) + + # Put some initial objects + await obs.put("A", b"A") + await obs.put("B", b"B") + await obs.put("C", b"C") + + # Start watching with updates_only=True + watcher = await obs.watch(updates_only=True) + + # Since updates_only=True, we should not receive any initial state + # and no None marker since there are existing objects + with pytest.raises(asyncio.TimeoutError): + await watcher.updates(timeout=1) + + # New updates should be received + await obs.put("D", b"D") + e = await watcher.updates() + assert e.name == "D" + assert e.bucket == "TEST_FILES" + assert e.size == 1 + assert e.chunks == 1 + + # Updates to existing objects should be received + await obs.put("A", b"AA") + e = await watcher.updates() + assert e.name == "A" + assert e.bucket == "TEST_FILES" + assert e.size == 2 + + # Deletes should be received + await obs.delete("B") + e = await watcher.updates() + assert e.name == "B" + assert e.deleted == True + + # Meta updates should be received + res = await obs.get("C") + to_update_meta = res.info.meta + to_update_meta.description = "changed" + await obs.update_meta("C", to_update_meta) + e = await watcher.updates() + assert e.name == "C" + assert e.description == "changed" + + await nc.close()