Skip to content

Commit 57e8b06

Browse files
committedFeb 3, 2025
Fix object store watch to use NEW rather than ALL
1 parent 7e7883e commit 57e8b06

File tree

2 files changed

+74
-1
lines changed

2 files changed

+74
-1
lines changed
 

‎nats/js/object_store.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -495,8 +495,12 @@ async def watch_updates(msg):
495495
await watcher._updates.put(None)
496496

497497
deliver_policy = None
498-
if not include_history:
498+
if include_history:
499499
deliver_policy = api.DeliverPolicy.LAST_PER_SUBJECT
500+
else:
501+
# Aligning with nats.js on this one, nats.go uses ALL if history is not included
502+
# But if history is not desired the watch should only be giving notifications on new entries
503+
deliver_policy = api.DeliverPolicy.NEW
500504

501505
watcher._sub = await self._js.subscribe(
502506
all_meta,

‎tests/test_js.py

+69
Original file line numberDiff line numberDiff line change
@@ -3912,6 +3912,75 @@ async def error_handler(e):
39123912

39133913
await nc.close()
39143914

3915+
@async_test
3916+
async def test_object_watch_include_history(self):
3917+
errors = []
3918+
3919+
async def error_handler(e):
3920+
print("Error:", e, type(e))
3921+
errors.append(e)
3922+
3923+
nc = await nats.connect(error_cb=error_handler)
3924+
js = nc.jetstream()
3925+
3926+
obs = await js.create_object_store(
3927+
"TEST_FILES",
3928+
config=nats.js.api.ObjectStoreConfig(description="multi_files"),
3929+
)
3930+
3931+
# Put objects before starting watcher
3932+
await obs.put("A", b"A")
3933+
await obs.put("B", b"B")
3934+
await obs.put("C", b"C")
3935+
3936+
# ------------------------------------
3937+
# Case 1: Watcher with include_history=True
3938+
# ------------------------------------
3939+
watcher = await obs.watch(include_history=True)
3940+
3941+
# Should receive historical updates immediately
3942+
e = await watcher.updates()
3943+
assert e.name == "A"
3944+
assert e.bucket == "TEST_FILES"
3945+
3946+
e = await watcher.updates()
3947+
assert e.name == "B"
3948+
3949+
e = await watcher.updates()
3950+
assert e.name == "C"
3951+
3952+
e = await watcher.updates()
3953+
assert e is None
3954+
3955+
# No new updates yet, expect timeout
3956+
with pytest.raises(asyncio.TimeoutError):
3957+
await watcher.updates(timeout=1)
3958+
3959+
# ------------------------------------
3960+
# Case 2: Watcher with include_history=False
3961+
# ------------------------------------
3962+
watcher_no_history = await obs.watch(include_history=False)
3963+
3964+
# Should receive no updates immediately
3965+
with pytest.raises(asyncio.TimeoutError):
3966+
await watcher_no_history.updates(timeout=1)
3967+
3968+
# Add a new object after starting the watcher
3969+
await obs.put("D", b"D")
3970+
3971+
# Now the watcher should see this update
3972+
e = await watcher_no_history.updates()
3973+
assert e.name == "D"
3974+
3975+
e = await watcher_no_history.updates()
3976+
assert e is None
3977+
3978+
# No further updates expected
3979+
with pytest.raises(asyncio.TimeoutError):
3980+
await watcher_no_history.updates(timeout=1)
3981+
3982+
await nc.close()
3983+
39153984
@async_test
39163985
async def test_object_list(self):
39173986
errors = []

0 commit comments

Comments
 (0)