-
Notifications
You must be signed in to change notification settings - Fork 215
Adapter: Implements RethinkDB as a source of documents #64
Conversation
Hey, thanks for this The session handling for this is mostly unfinished right now, so i don't see that as being a blocker to getting this in |
You are right, there is no support yet for restarting a changes feed in RethinkDB. There's at least one proposal (rethinkdb/rethinkdb#3471), but it is not yet implemented. That said, as far as I can tell, this is a potential problem with the MongoDB adapter too. Imagine this scenario:
I believe it's mitigated by the fact that the MongoDB source---and this proposed RethinkDB source too---send all documents before tailing the changes feed. Otherwise, we'd need to keep state around between runs of the transporter and implement some kind of snapshotting mechanism. It might be a good idea to consider this at some point, if only because sending all documents is expensive for large tables, but it's also something that would require some architectural rethinking and would make transporter a stateful service, where it's now stateless. The most obvious place that it falls down is deletions. If the transporter restarts and a document is deleted during the downtime, we will not be notified of it and will therefore not send a I decided not to tackle the "deletions during downtime" problem in this PR because I think it will require more fundamental architectural changes, and it is already not well supported in MongoDB. I recommend that we 🏈 it to a new PR, though some documentation around that failure scenario might be appropriate in the mean time. What do you think? :) |
That is true at the moment and some work has begun adaptor-state on being able to optionally add state persistence to transporter but other work and changes to the message.Msg caused us to hold off on it for now.
The current proposal (not really documented other than what has been implemented in the branch) would be that adaptor's would mostly not know or care about State except during startup where the last known good state would be injected into it. The gathering of State happens within the Pipeline and the persistence is something that is controlled within the implemented SessionStore. So, ideally, adaptors would not change much at all other than performing some Resume process on startup. All that to say, this PR should not be held up by that implementation now and even after we have the concept of State, until (rethinkdb/rethinkdb#3471) is available, the RethinkDB adaptor will be unable to resume from a point in time. I have a one maybe two suggestions/changes but I'll make them inline relative to the code. |
@@ -31,10 +33,24 @@ type Rethinkdb struct { | |||
client *gorethink.Session | |||
} | |||
|
|||
// rethinkDbConfig provides custom configuration options for the RethinkDB adapter | |||
type rethinkDbConfig struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be publicly accessible so that it can be properly "Registered" here as:
Register("rethinkdb", "a rethinkdb sink adaptor", NewRethinkdb, RethinkdbConfig{})
@jipperinbham Thanks for the 👀 I've pushed some changes that I believe address your comments. |
👍 |
Adapter: Implements RethinkDB as a source of documents
Similar to the MongoDB adapter, this proposed RethinkDB source sends all documents in the table, then (if configured via the
tail
configuration parameter) watches for changes via RethinkDB's Changefeeds feature.Also implemented is a small change to the ElasticSearch adapter to support
Delete
operations.👀 in the form of code review definitely welcomed.
/cc: @brandon-beacher