-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[Feature] MongoDB Resume Tokens #196
base: main
Are you sure you want to change the base?
Conversation
🦋 Changeset detectedLatest commit: 18ce978 The changes in this PR will be included in the next version bump. This PR includes changesets to release 14 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
export class ChangeStreamInvalidatedError extends Error { | ||
constructor(message: string) { | ||
super(message); | ||
export class ChangeStreamInvalidatedError extends DatabaseConnectionError { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could do something similar for Postgres and MySQL
@@ -589,6 +595,11 @@ export class ChangeStream { | |||
|
|||
const originalChangeDocument = await stream.tryNext(); | |||
|
|||
// The stream was closed, we will only ever receive `null` from it | |||
if (!originalChangeDocument && stream.closed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed this behaviour when testing dropping a MongoDB database while the PowerSync service was actively replicating. The tryNext
call would emit null
in an endless loop. We typically receive a null
response about every second in normal operations, the frequency is much higher after a DB has been dropped. This typically causes the process to hang.
@@ -85,7 +85,7 @@ export class ChangeStreamTestContext { | |||
} | |||
|
|||
startStreaming() { | |||
this.streamPromise = this.walStream.streamChanges(); | |||
return (this.streamPromise = this.walStream.streamChanges()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This helps catch errors from the streaming process.
An alternative would be to await the dispose
method since that method awaits the streamPromise
. Unfortunately the dispose also aborts the replication, which can make it tricky to detect errors.
Context
Our current MongoDB Change Stream replicator uses the cluster time as the equivalent of an LSN (Log Sequence Number).
Typically, the LSN tracks replication progress and associates user write checkpoints with the active replication state. In MongoDB, the LSN (cluster time) is used as the startAfter value when resuming Change Stream consumption after a restart.
However, relying solely on cluster time can lead to edge cases where replication becomes inconsistent. For example, if replication connection details change after replication has started, the replicator may attempt to consume a Change Stream from a new database using the previously stored cluster time. Since MongoDB does not always return an error in this scenario, replication can silently fail.
MongoDB Change Streams provide Resume Tokens, which are unique to a specific database and can be used to manage cursor positioning more reliably. Using Resume Tokens as the Change Stream cursor should help prevent these inconsistencies.
Feature
This PR introduces support for storing Resume Tokens alongside cluster time in the MongoDB LSN. This approach mirrors how we store the binary log position alongside the GTID for MySQL.
If a Resume Token is available, it will be used when resuming Change Stream consumption. The LSN format remains lexicographically comparable, ensuring compatibility with existing LSNs. Unit tests have been added to verify that comparisons work correctly across both new and old LSN formats, which is critical for user write checkpoints.
Additionally, a unit test has been added to handle scenarios where replication connection details change after replication has started. MongoDB will now behave similarly to Postgres when a replication slot is missing—replication will restart from an initial snapshot with a new set of sync rule data.