Skip to content

Commit 9491531

Browse files
committed
Fix realtime re-subscribing stale data issue
Fixes an issue with realtime when re-subscribing to a run, that would temporarily display stale data and the changes. Now when re-subscribing to a run only the latest changes will be vended
1 parent 3f2b748 commit 9491531

File tree

3 files changed

+50
-2
lines changed

3 files changed

+50
-2
lines changed

.changeset/wicked-ads-walk.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/react-hooks": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Fixes an issue with realtime when re-subscribing to a run, that would temporarily display stale data and the changes. Now when re-subscribing to a run only the latest changes will be vended

packages/core/src/v3/apiClient/stream.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ class ReadableShapeStream<T extends Row<unknown> = Row> {
114114
},
115115
});
116116

117+
let updatedKeys = new Set<string>();
118+
117119
// Create the transformed stream that processes messages and emits complete rows
118120
this.#changeStream = createAsyncIterableStream(source, {
119121
transform: (messages, controller) => {
@@ -122,9 +124,13 @@ class ReadableShapeStream<T extends Row<unknown> = Row> {
122124
}
123125

124126
try {
125-
const updatedKeys = new Set<string>();
127+
let isUpToDate = false;
128+
129+
console.log(`Processing ${messages.length} messages`);
126130

127131
for (const message of messages) {
132+
console.log("shape message", message);
133+
128134
if (isChangeMessage(message)) {
129135
const key = message.key;
130136
switch (message.headers.operation) {
@@ -147,18 +153,27 @@ class ReadableShapeStream<T extends Row<unknown> = Row> {
147153
if (message.headers.control === "must-refetch") {
148154
this.#currentState.clear();
149155
this.#error = false;
156+
} else if (message.headers.control === "up-to-date") {
157+
console.log("Setting isUpToDate to true");
158+
isUpToDate = true;
150159
}
151160
}
152161
}
153162

154163
// Now enqueue only one updated row per key, after all messages have been processed.
155-
if (!this.#isStreamClosed) {
164+
// If the stream is not up to date, we don't want to enqueue any rows.
165+
if (!this.#isStreamClosed && isUpToDate) {
156166
for (const key of updatedKeys) {
157167
const finalRow = this.#currentState.get(key);
158168
if (finalRow) {
169+
console.log("enqueueing finalRow", finalRow);
159170
controller.enqueue(finalRow);
160171
}
161172
}
173+
174+
updatedKeys.clear();
175+
} else {
176+
console.log("Not enqueuing any rows because the stream is not up to date");
162177
}
163178
} catch (error) {
164179
console.error("Error processing stream messages:", error);

references/hello-world/src/trigger/realtime.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { logger, runs, task } from "@trigger.dev/sdk";
22
import { helloWorldTask } from "./example.js";
3+
import { setTimeout } from "timers/promises";
34

45
export const realtimeByTagsTask = task({
56
id: "realtime-by-tags",
@@ -32,3 +33,29 @@ export const realtimeByTagsTask = task({
3233
};
3334
},
3435
});
36+
37+
export const realtimeUpToDateTask = task({
38+
id: "realtime-up-to-date",
39+
run: async ({ runId }: { runId?: string }) => {
40+
if (!runId) {
41+
const handle = await helloWorldTask.trigger(
42+
{ hello: "world" },
43+
{
44+
tags: ["hello-world", "realtime"],
45+
}
46+
);
47+
48+
runId = handle.id;
49+
}
50+
51+
logger.info("runId", { runId });
52+
53+
for await (const run of runs.subscribeToRun(runId, { stopOnCompletion: true })) {
54+
logger.info("run", { run });
55+
}
56+
57+
return {
58+
message: "Hello, world!",
59+
};
60+
},
61+
});

0 commit comments

Comments
 (0)