-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
DB Manager #263
DB Manager #263
Conversation
return err | ||
} | ||
|
||
// Increment the request ID for the next message relay request | ||
lstnr.currentRequestID++ | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was an existing bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still going through, but will submit these comments for you to start thinking about
database/manager.go
Outdated
for height := range km.finished { | ||
counter, ok := km.queuedHeightsAndMessages[height] | ||
if !ok { | ||
continue | ||
} | ||
counter.processedMessages++ | ||
if counter.processedMessages == counter.totalMessages { | ||
km.commitHeight(height) | ||
delete(km.queuedHeightsAndMessages, height) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I'm pretty sure we have a race condition in here. Lets say the current highest processed height is 99 and we are currently processing blocks 100 and 101. If 101 finished processing before block 100, we will update the highest processed block to 101, while not having processed 100 yet. We have to check queuedHeightsAndMessages
to make sure there is no lower height being processed.
We could do something like... Let's say our highest processed block is N. If the height we're committing is N+1, set the value to that. Otherwise, throw that block height in a min heap. Then when we finally process block N+1, we look in the min heap for N+2, and keep removing values from the heap as long as we have the next block in sequence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm understanding the code correctly, it does look like the commitHeight function needs to do some buffering to avoid gaps in the committed block height.
A circular buffer could be used in place of a min heap if the difference between the highest processed block and the current committed height is bounded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great catch. I've implemented the min-heap based approach you suggested.
On a related note, I'm debating how we should handle the case in which a block height never calls commitHeight
. In that case, the min-heap would grow unbounded, and no new data would be written to the DB. One approach would be to kill the entire application if the min-heap grows too large, and attempt to re-process the missing block on restart. If the issue persists, the operator would have to intervene by either setting process-missed-blocks
to false, or manually updating the DB. One thing I like about this approach is that all blocks up to the height stored in the DB are guaranteed to be processed; only with manual intervention can this be made untrue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... Yeah I wonder what the best approach would be here to handle the case of a missing block. I don't really want to kill the whole application for one missing block for one application.
The approach that is coming to mind is setting some threshold, say 100 blocks, that if the min heap grows bigger than, we just pop the min value from the heap and set that as our most recently processed block, and throw an error for any missed blocks. We could also write the missed blocks to a table somewhere for potential reprocessing later.
database/manager.go
Outdated
func (km *keyManager) commitHeight(height uint64) { | ||
if height > km.maxCommittedHeight { | ||
km.logger.Debug("committing height", zap.Uint64("height", height)) | ||
km.maxCommittedHeight = height | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commitHeight
sounds to me like it should be writing to the database. Maybe heightProcessed
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of the comments assume we're going to be processing logs async, but I know they're currently still synchronous per source chain.
relayer/listener.go
Outdated
if err != nil { | ||
return err | ||
} | ||
relayer.dbManager.PrepareHeight(relayer.relayerID, height, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should be using PrepareHeight
here. Maybe a separate helper that force sets the height to latest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keyManager.prepareHeight
immediately commits the height if the number of messages to process is 0
. I don't think we should directly expose commitHeight
to dbManager
callers. I can add separate method with a clearer name that makes this call internally though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So with the min-heap implemented, prepareheight
with 0 messages should only write the height if the current height is N and we commit N+1. We need a separate helper to force-set the height to latest here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commitHeight
automatically commits if km.maxCommittedHeight == 0
, which is the case on initialization, and should be the case when this function is called, since it is called before the main processing loop in ProcessLogs
is initiated.
I agree that this is not at all clear though. I'll add a more specific method that can be called here, and update commitHeight
to accept a "force" option.
database/manager.go
Outdated
// This function should only be called once. | ||
func (dm *DatabaseManager) Run() { | ||
for range time.Tick(dm.interval) { | ||
for id, km := range dm.keyManagers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment about what happens if this inner for loop takes longer than the tick interval?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually doing a quick readup seems like the ticks continue in background, and if the underlying for loop is not completed, then the tick iteration will get dropped and waits for the next. Is that a problem at all for short durations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tick interval is on the order of seconds (or 10's of seconds), so the only expected scenario this could occur is if there's network issues reaching the remote database. Even so, this isn't really a problem since a missed write to the database is perfectly safe.
database/manager.go
Outdated
) | ||
err = dm.db.Put(id.ID, LatestProcessedBlockKey, []byte(strconv.FormatUint(km.maxCommittedHeight, 10))) | ||
if err != nil { | ||
dm.logger.Error("Failed to write latest processed block height", zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do the key managers have some identifier that can be logged here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how are you thinking of detecting if one of many key managers are in a stuck state and can't erroring when going to its iterations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do the key managers have some identifier that can be logged here too?
Added this to the log.
how are you thinking of detecting if one of many key managers are in a stuck state and can't erroring when going to its iterations?
I'm not sure what you mean. In the context of this function, the key managers are read-only so the database manager would always be able to progress to the next iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left a few concurrency-related comments.
database/manager.go
Outdated
for height := range km.finished { | ||
counter, ok := km.queuedHeightsAndMessages[height] | ||
if !ok { | ||
continue | ||
} | ||
counter.processedMessages++ | ||
if counter.processedMessages == counter.totalMessages { | ||
km.commitHeight(height) | ||
delete(km.queuedHeightsAndMessages, height) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm understanding the code correctly, it does look like the commitHeight function needs to do some buffering to avoid gaps in the committed block height.
A circular buffer could be used in place of a min heap if the difference between the highest processed block and the current committed height is bounded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Co-authored-by: Michael Kaplan <55204436+michaelkaplan13@users.noreply.github.com> Signed-off-by: cam-schultz <78878559+cam-schultz@users.noreply.github.com>
3b3cf0e
Co-authored-by: Michael Kaplan <55204436+michaelkaplan13@users.noreply.github.com> Signed-off-by: cam-schultz <78878559+cam-schultz@users.noreply.github.com>
f34e84e
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Why this should be merged
Fixes #234 and unblocks #31
The relaying logic of an Application Relayer is largely parallelizable. Specifically, parsing Warp messages, aggregating signatures, and constructing and issuing transactions require no synchronization. The one operation that does is database access. Note that the
RelayerDatabse
interface assumes that writes are thread safe, but theRelayerDatabase
interface makes no assumptions about the consistency of concurrent writes. In its current form, this is an issue because concurrent Application Relayer worker threads may complete blocks out of order.How this works
To support consistent writes, this PR introduces a database manager layer that prepares and commits writes when it is sure that there are no more pending jobs for a particular block height. It accomplishes this with the following changes:
Subscriber
s now subscribe to new blocks, rather than relevant Warp logs. After a new block is received, the number of relevant Warp logs for each application relayer are counted, and then dispatched for processing.keyManager
type that is initialized with the number of expected jobs, and commits a height to the database once all expected jobs have completed. This decouples database write consistency from the order in which messages are processed. (By comparison, the existing implementation writes the current processing message's height to the database; this requires that messages are processed serially in block order)DatabaseManager
type that periodically writes committed block heights to the associated key in the database. The write interval is configurable.How this was tested
CI
How is this documented
Updated README