Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Process missed blocks on startup #20

Merged
merged 52 commits into from
Sep 13, 2023
Merged

Process missed blocks on startup #20

merged 52 commits into from
Sep 13, 2023

Conversation

cam-schultz
Copy link
Collaborator

@cam-schultz cam-schultz commented Aug 30, 2023

Why this should be merged

Enables the relayer to process warp messages sent while the relayer was offline. Fixes #14

How this works

Adds a ProcessFromHeight method to Subscriber. The EVM implementation calls eth_getLogs to retrieve Warp logs since the last processed block. Adds JSON storage to persist the last processed block.

How this was tested

CI, Teleporter integration tests, added unit tests for JSON storage, manual testing. Will circle back and add end-to-end tests once #15 is complete.

@cam-schultz cam-schultz marked this pull request as ready for review August 30, 2023 14:42
@michaelkaplan13 michaelkaplan13 linked an issue Aug 30, 2023 that may be closed by this pull request
database/json_file_storage.go Outdated Show resolved Hide resolved
}

// Get the latest chain state from the json database, and retrieve the value from the key
func (s *JsonFileStorage) Get(chainID ids.ID, key []byte) ([]byte, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is my understanding correct that Get is only called once on start up to see what the latest processed block height was from the previous runs? Then we relay any messages up to the tip of chain, and then rely on the subscriber to push new logs, so we don't need to know what the latest processed block height at any other point.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct.

@@ -84,7 +117,87 @@ func (s *subscriber) forwardLogs() {
continue
}
s.log <- *messageInfo

// Update the database with the latest block height
// TODO: This should also be done in a separate goroutine, rather than waiting for warp messages to be processed
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify here, do you mean "rather than waiting for the write to disk to finish before processing future messages"?

I definitely agree with the TODOs here though, we should address them prior to merging I think. Another edge case to consider is if there are multiple messages in the same block. We should be careful to only update the "latest block height" once we are sure that all messages prior to (and including?) that block have been processed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite, I more meant we should update the latest processed height on disk periodically, rather than only doing so when we process a warp message. I was thinking we'd want to avoid processing large gaps of blocks without warp messages on startup. Though thinking about it some more, I don't think that's even necessary, since the eth_getLogs call on startup won't return any logs for that large gap anyway.

To illustrate, say there's a warp message at block 1000, and the next warp message is at block 2000, but the relayer is offline and unable to process that second message. On restart, whether or not the stored block heigh is 1000 or 1999, the eth_getLogs query will return the same result.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point on updating the latest block height only after all messages have been processed. I'll add that in as part of this PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in the current iteration of this PR, we actually store the latest seen block, not the latest fully processed block (I will rename the variables and keys to be more clear). What this means is that if the relayer falls over while processing a large number of warp messages from a particular block, we'll re-process all of those messages on restart. That's obviously not the most efficient use of gas, as already delivered messages would be re-broadcast, so I created this ticket to address that: #22. IMO we can save that for a separate PR, since it doesn't impact correctness here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah all of that reasoning sounds good to me. I just want to confirm that we don't update the latest seen block until after all messages in that block have been processed. Otherwise, if we update the latest seen block prior to processing all messages in that block, we may end up missing messages in that block if the relayer hits a failure and restarts.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eth_getLogs returns logs from the provided block height inclusive. So say we see block 1000, and only process some of the warp messages contained therein. On restart, we'll reprocess block 1000, potentially re processing warp messages, but definitely not missing any.

relayer/relayer.go Outdated Show resolved Hide resolved
go r.RouteToMessageChannel()

// Initialize the subscriber. This will poll the node for any logs that match the filter query from the stored block height,
// and process the contained warp messages. If initialization fails, continue with normal relayer operation, but log the error.
err = sub.Initialize()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a possibility to still not process messages from all blocks here because the subscription is opened until after we finish Initialize, so there could theoretically be new blocks produced/accepted in between those two events.

I'm not sure how complicated the implementation would be, but I think the desired behavior would be:

  1. Open the subscription to be notified about logs in new blocks.
  2. Get the current block height on chain.
  3. Process messages from any blocks from the "latest processed" height up to the current block height.
  4. Start processing any messages received over the subscription.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I think that should be fine, as long as we grab the latest processed block from the db before opening the subscription. Message order wouldn't be guaranteed, but that's not part of the relayer spec anyway.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated now.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RIght now I think we're running subscribe and processFromHeight in two separate go routines concurrently, so if new blocks came before processFromHeight gets back the latest block height, we'd double process that block right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. I think it would be somewhat tricky to cover all the edge cases here, especially since the eth_subscribe and eth_ blockNumber calls cannot be done atomically. The alternative I'm currently in favor of is implementing #22 to guard against double processing, rather than doing so by synchronizing the various goroutines.

}

// Get the latest processed block height from the database.
heightData, err := s.db.Get(s.chainID, []byte(database.LatestBlockHeightKey))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we handle the "file doesn't exist" error case here for the first initialization of the relayer?

We should clearly document the behavior in this case, but I think it likely makes the most sense to treat the case where the file doesn't exist as the first run of the relayer, and not relay any messages prior to it (since that would involve checking each message from the start of the chain).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Initialize fails for any reason, we simply continue with normal operation, opening subscriptions, relaying messages, etc. So if the file doesn't exist, we don't back-process any warp messages, and only process new incoming ones. OTOH, if the file does exist and has latestProcessedBlock: 1, then yes, we'll deliver every warp message from genesis (though I'm sure eth_getLogs has some limits built in).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now when we do db.Get it looks to me if a file does not exist, we still return an error, and when the relayer.NewRelayer sees the errror it'll return and not continue processing.

I think we should return a specific file does not exist error from db.Get, and then in NewRelayer if we get that specific error back we return without setting latestSeenBlock and going through processFromHeight

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like

	// Get the latest processed block height from the database.
	latestSeenBlockData, err := r.db.Get(r.sourceChainID, []byte(database.LatestSeenBlockKey))
	if err != nil {
                if err == FileDoesNotExist {
                      r.logger.Info("First run of relayer for chain ID %s, db file does not exist")
                      return &r, sub, nil 
                }
		r.logger.Warn("failed to get latest block from database", zap.Error(err))
		return nil, nil, err
	}
	latestSeenBlock, success := new(big.Int).SetString(string(latestSeenBlockData), 10)
	if !success {
		r.logger.Error("failed to convert latest block to big.Int", zap.Error(err))
		return nil, nil, err
	}

	// Back-process all warp messages from the latest seen block to the latest block
	// This will query the node for any logs that match the filter query from the stored block height,
	// and process the contained warp messages. If initialization fails, continue with normal relayer operation, but log the error.
	err = sub.ProcessFromHeight(latestSeenBlock)
	if err != nil {
		logger.Warn(
			"Encountered an error when processing historical blocks. Continuing to normal relaying operation.",
			zap.Error(err),
		)
	}

	return &r, sub, nil

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. Fixed this now. In the case in which the file doesn't exist, we return the relayer instance, but skip the sub.ProcessFromHeight call.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though I'm sure eth_getLogs has some limits built in

That's a good call out. We should check this to make sure we handle any page limits properly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this limit is imposed by the deployment or client, and not by the method itself. Most services limit the number of blocks to 500-2000. I'd recommend limiting it to 200 as a conservative starting point. We can reevaluate this figure or add batched calls to accomodate larger ranges in the future, but IMO this is not an immediate concern.


// If the file does not exist, return false, but do not return an error as this
// is an expected case
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would os.ErrNotExist catch? In the comments and it mentions returning a *PathError

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and how should we handle if an error is returned but it's not os.ErrNoExist? Right now we continue to read file

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

os.Stat returns a *PathError that wraps a os.ErrNotExist if the file is not found. See here for an example: https://go.dev/play/p/qFwPDKFqSZS

geoff-vball
geoff-vball previously approved these changes Sep 12, 2023
Copy link
Contributor

@bernard-avalabs bernard-avalabs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


// RelayerDatabase is a key-value store for relayer state, with each chainID maintaining its own state
type RelayerDatabase interface {
Get(chainID ids.ID, key []byte) ([]byte, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we return []byte instead of string directly? When we write in to the file, we convert []byte to string. Also, when we use the result, we convert []byte to string.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this offline, but for visibility, I chose []byte in the interface so that the keys and values could be as general as possible. The underlying JSON databse uses strings so that the resulting file is human readable.

gwen917
gwen917 previously approved these changes Sep 12, 2023
// UpdateLatestSeenBlock retrieves the latest block from the chain and updates the database
UpdateLatestSeenBlock() error
// UpdateLatestProcessedBlock retrieves the latest block from the chain and updates the database
UpdateLatestProcessedBlock() error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final nit: Could we think of a more descriptive name for this function? My only idea is SetInitialProcessedBlockHeight, to make it clear that it is only used when there is not an existing processed block height on start up, and that it uses whatever the current block height of the chain is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm more in favor of omitting Initial from the function name, since the interface imposes no requirement that this function be called at any particular time in the object lifecycle. I agree it could be more descriptive as to what it's actually updating the database value to. I've renamed it SetProcessedBlockHeightToLatest; let me know if you think another name more accurately reflects the behavior.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, yeah totally agree. Good call 🙏

Copy link
Contributor

@bernard-avalabs bernard-avalabs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}
// Relay the message to the destination. Messages from a given source chain must be processed in serial in order to
// guarantee that the previous block (n-1) is fully processed by the relayer when processing a given log from block n.
err = messageRelayer.relayMessage(warpMessageInfo, r.currentRequestID, messageManager)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a future PR: If we are not already, it might be helpful to capture some performance metrics (e.g., latency, wait time) so that we aren't caught off guard if the system is performing poorly. With our design, a relayer that can't keep up with the request rate may eventually not make any progress if it falls too far behind.

zap.String("chainID", r.sourceChainID.String()),
zap.Error(err),
)
return nil, nil, err
}

// RelayMessage relays a single warp message to the destination chain. Warp message relay requests are concurrent with each other,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Warp message relay requests are concurrent with each other"

Does this need to be updated after the recent change to process messages from a given source serially?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch - updated now

Copy link
Collaborator

@michaelkaplan13 michaelkaplan13 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

geoff-vball
geoff-vball previously approved these changes Sep 13, 2023
@cam-schultz cam-schultz merged commit d73cd3c into main Sep 13, 2023
@cam-schultz cam-schultz deleted the catch-up-blocks branch September 13, 2023 16:04
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Explore catch up options for relayer downtime
6 participants