Skip to content

Commit

Permalink
[Deno] Improve Ack Ack (#239)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jul 23, 2024
1 parent 115f8a6 commit c182c3d
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 55 deletions.
40 changes: 14 additions & 26 deletions examples/jetstream/ack-ack/deno/main.js
Original file line number Diff line number Diff line change
@@ -1,56 +1,44 @@
import {
AckPolicy,
StorageType,
nuid,
connect
} from "https://deno.land/x/nats@v1.28.0/src/mod.ts";

const servers = Deno.env.get("NATS_URL")?.split(",");

const nc = await connect({ servers });

// create a stream with a random name with some messages and a consumer
const stream = "confirmAckStream";
const subject = "confirmAckSubject";

const jsm = await nc.jetstreamManager();
const js = nc.jetstream();

// Create a stream
// (remove the stream first so we have a clean starting point)
try {
await jsm.streams.delete(stream);
} catch (err) {
if (err.code != 404) {
console.error(err.message);
}
}

// Create the stream
const subj = nuid.next();
const name = `EVENTS_${subj}`;
await jsm.streams.add({
name: stream,
subjects: [subject],
storage: StorageType.Memory,
name,
subjects: [subj]
});

// Publish a couple messages so we can look at the state
await js.publish(subject)
await js.publish(subject)
await js.publish(subj)
await js.publish(subj)

// Consume a message with 2 different consumers
// The first consumer will (regular) ack without confirmation
// The second consumer will ackSync which confirms that ack was handled.

// Consumer 1 will use ack()
const ci1 = await jsm.consumers.add(stream, {
const ci1 = await jsm.consumers.add(name, {
name: "consumer1",
filter_subject: subject,
filter_subject: subj,
ack_policy: AckPolicy.Explicit
});
console.log("Consumer 1");
console.log(" Start");
console.log(` pending messages: ${ci1.num_pending}`);
console.log(` messages with ack pending: ${ci1.num_ack_pending}`);

const consumer1 = await js.consumers.get(stream, "consumer1");
const consumer1 = await js.consumers.get(name, "consumer1");

try {
const m = await consumer1.next();
Expand All @@ -72,17 +60,17 @@ try {


// Consumer 2 will use ackAck()
const ci2 = await jsm.consumers.add(stream, {
const ci2 = await jsm.consumers.add(name, {
name: "consumer2",
filter_subject: subject,
filter_subject: subj,
ack_policy: AckPolicy.Explicit
});
console.log("Consumer 2");
console.log(" Start");
console.log(` pending messages: ${ci2.num_pending}`);
console.log(` messages with ack pending: ${ci2.num_ack_pending}`);

const consumer2 = await js.consumers.get(stream, "consumer2");
const consumer2 = await js.consumers.get(name, "consumer2");

try {
const m = await consumer2.next();
Expand Down
9 changes: 0 additions & 9 deletions examples/jetstream/ack-ack/deno/output.cast

This file was deleted.

20 changes: 0 additions & 20 deletions examples/jetstream/ack-ack/deno/output.txt

This file was deleted.

1 comment on commit c182c3d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for nats-by-example ready!

✅ Preview
https://nats-by-example-391jjw4ln-connecteverything.vercel.app

Built with commit c182c3d.
This pull request is being automatically deployed with vercel-action

Please # to comment.