Skip to content

Commit

Permalink
fix(sync): fixed proposer by height syncing issue on startup and rota…
Browse files Browse the repository at this point in the history
…tion (#1189)

Co-authored-by: Sergi Rene <sergi@dymension.xyz>
  • Loading branch information
omritoptix and srene authored Nov 4, 2024
1 parent 58b9caf commit 5878792
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 90 deletions.
8 changes: 6 additions & 2 deletions block/initchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package block
import (
"context"
"errors"
"fmt"

tmtypes "github.com/tendermint/tendermint/types"
)

func (m *Manager) RunInitChain(ctx context.Context) error {
// FIXME: We want to get the initial proposer and not current one
proposer := m.SLClient.GetProposer()
// Get the proposer at the initial height. If we're at genesis the height will be 0.
proposer, err := m.SLClient.GetProposerAtHeight(int64(m.State.Height()) + 1)
if err != nil {
return fmt.Errorf("get proposer at height: %w", err)
}
if proposer == nil {
return errors.New("failed to get proposer")
}
Expand Down
27 changes: 19 additions & 8 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ import (
uchannel "github.com/dymensionxyz/dymint/utils/channel"
)

const (
// RunModeProposer represents a node running as a proposer
RunModeProposer uint = iota
// RunModeFullNode represents a node running as a full node
RunModeFullNode
)

// Manager is responsible for aggregating transactions into blocks.
type Manager struct {
logger types.Logger
Expand All @@ -55,6 +62,9 @@ type Manager struct {
DAClient da.DataAvailabilityLayerClient
SLClient settlement.ClientI

// RunMode represents the mode of the node. Set during initialization and shouldn't change after that.
RunMode uint

/*
Sequencer and full-node
*/
Expand Down Expand Up @@ -194,12 +204,11 @@ func (m *Manager) Start(ctx context.Context) error {
// Check if a proposer on the rollapp is set. In case no proposer is set on the Rollapp, fallback to the hub proposer (If such exists).
// No proposer on the rollapp means that at some point there was no available proposer.
// In case there is also no proposer on the hub to our current height, it means that the chain is halted.
// FIXME: In case we are syncing we would like to get the proposer from the hub relevant to the current height.
if m.State.GetProposer() == nil {
m.logger.Info("No proposer on the rollapp, fallback to the hub proposer, if available")
SLProposer := m.SLClient.GetProposer()
if SLProposer == nil {
return fmt.Errorf("no proposer available. chain is halted")
SLProposer, err := m.SLClient.GetProposerAtHeight(int64(m.State.Height()))
if err != nil {
return fmt.Errorf("get proposer at height: %w", err)
}
m.State.SetProposer(SLProposer)
}
Expand All @@ -208,12 +217,14 @@ func (m *Manager) Start(ctx context.Context) error {
// In case of sequencer rotation, there's a phase where proposer rotated on Rollapp but hasn't yet rotated on hub.
// for this case, 2 nodes will get `true` for `AmIProposer` so the l2 proposer can produce blocks and the hub proposer can submit his last batch.
// The hub proposer, after sending the last state update, will panic and restart as full node.
amIProposer := m.AmIProposerOnSL() || m.AmIProposerOnRollapp()

m.logger.Info("starting block manager", "mode", map[bool]string{true: "proposer", false: "full node"}[amIProposer])
amIProposerOnSL, err := m.AmIProposerOnSL()
if err != nil {
return fmt.Errorf("am i proposer on SL: %w", err)
}
amIProposer := amIProposerOnSL || m.AmIProposerOnRollapp()

// update local state from latest state in settlement
err := m.updateFromLastSettlementState()
err = m.updateFromLastSettlementState()
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}
Expand Down
4 changes: 4 additions & 0 deletions block/modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

// setFraudHandler sets the fraud handler for the block manager.
func (m *Manager) runAsFullNode(ctx context.Context, eg *errgroup.Group) error {
m.logger.Info("starting block manager", "mode", "full node")
m.RunMode = RunModeFullNode
// update latest finalized height
err := m.updateLastFinalizedHeightFromSettlement()
if err != nil {
Expand All @@ -36,6 +38,8 @@ func (m *Manager) runAsFullNode(ctx context.Context, eg *errgroup.Group) error {
}

func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error {
m.logger.Info("starting block manager", "mode", "proposer")
m.RunMode = RunModeProposer
// Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settlement and by the time we query the last batch, this batch wasn't accepted yet.
go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger)

Expand Down
3 changes: 1 addition & 2 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ func (m *Manager) PruningLoop(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case retainHeight := <-m.pruningC:

var pruningHeight uint64
if m.AmIProposerOnSL() || m.AmIProposerOnRollapp() { // do not delete anything that we might submit in future
if m.RunMode == RunModeProposer { // do not delete anything that we might submit in future
pruningHeight = min(m.NextHeightToSubmit(), uint64(retainHeight))
} else { // do not delete anything that is not validated yet
pruningHeight = min(m.SettlementValidator.NextValidationHeight(), uint64(retainHeight))
Expand Down
27 changes: 11 additions & 16 deletions block/sequencers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,16 @@ func (m *Manager) MonitorSequencerSetUpdates(ctx context.Context) error {
}
}

// AmIPropoesr checks if the the current node is the proposer either on L2 or on the hub.
// In case of sequencer rotation, there's a phase where proposer rotated on L2 but hasn't yet rotated on hub.
// for this case, 2 nodes will get `true` for `AmIProposer` so the l2 proposer can produce blocks and the hub proposer can submit his last batch.
func (m *Manager) AmIProposer() bool {
return m.AmIProposerOnSL() || m.AmIProposerOnRollapp()
}

// AmIProposerOnSL checks if the current node is the proposer on the hub
// Proposer on the Hub is not necessarily the proposer on the L2 during rotation phase.
func (m *Manager) AmIProposerOnSL() bool {
// Proposer on the Hub is not necessarily the proposer on the Rollapp during rotation phase.
func (m *Manager) AmIProposerOnSL() (bool, error) {
localProposerKeyBytes, _ := m.LocalKey.GetPublic().Raw()

// get hub proposer key
var hubProposerKeyBytes []byte
hubProposer := m.SLClient.GetProposer()
if hubProposer != nil {
hubProposerKeyBytes = hubProposer.PubKey().Bytes()
SLProposer, err := m.SLClient.GetProposerAtHeight(-1)
if err != nil {
return false, fmt.Errorf("get proposer at height: %w", err)
}
return bytes.Equal(hubProposerKeyBytes, localProposerKeyBytes)
return bytes.Equal(SLProposer.PubKey().Bytes(), localProposerKeyBytes), nil
}

// AmIProposerOnRollapp checks if the current node is the proposer on the rollapp.
Expand All @@ -90,7 +81,11 @@ func (m *Manager) ShouldRotate() (bool, error) {
}
// At this point we know that there is a next proposer,
// so we should rotate only if we are the current proposer on the hub
return m.AmIProposerOnSL(), nil
amIProposerOnSL, err := m.AmIProposerOnSL()
if err != nil {
return false, fmt.Errorf("am i proposer on SL: %w", err)
}
return amIProposerOnSL, nil
}

// rotate rotates current proposer by doing the following:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 59 additions & 35 deletions settlement/dymension/dymension.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"slices"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -50,7 +49,6 @@ type Client struct {
rollappQueryClient rollapptypes.QueryClient
sequencerQueryClient sequencertypes.QueryClient
protoCodec *codec.ProtoCodec
proposer types.Sequencer
retryAttempts uint
retryMinDelay time.Duration
retryMaxDelay time.Duration
Expand Down Expand Up @@ -312,49 +310,53 @@ func (c *Client) GetLatestFinalizedHeight() (uint64, error) {
return res.Height, nil
}

// GetProposer implements settlement.ClientI.
func (c *Client) GetProposer() *types.Sequencer {
// return cached proposer
if !c.proposer.IsEmpty() {
return &c.proposer
}

// GetProposerAtHeight return the proposer at height.
// In case of negative height, it will return the latest proposer.
func (c *Client) GetProposerAtHeight(height int64) (*types.Sequencer, error) {
// Get all sequencers to find the proposer address
seqs, err := c.GetBondedSequencers()
if err != nil {
c.logger.Error("GetBondedSequencers", "error", err)
return nil
return nil, fmt.Errorf("get bonded sequencers: %w", err)
}

// Get either latest proposer or proposer at height
var proposerAddr string
err = c.RunWithRetry(func() error {
reqProposer := &sequencertypes.QueryGetProposerByRollappRequest{
RollappId: c.rollappId,
}
res, err := c.sequencerQueryClient.GetProposerByRollapp(c.ctx, reqProposer)
if err == nil {
proposerAddr = res.ProposerAddr
return nil
if height < 0 {
proposerAddr, err = c.getLatestProposer()
if err != nil {
return nil, fmt.Errorf("get latest proposer: %w", err)
}
if status.Code(err) == codes.NotFound {
return nil
} else {
// Get the state info for the relevant height and get address from there
res, err := c.GetBatchAtHeight(uint64(height))
// if case of height not found, it may be because it didn't arrive to the hub yet.
// In that case we want to return the current proposer.
if err != nil {
// If batch not found, fallback to latest proposer
if errors.Is(err, gerrc.ErrNotFound) {
proposerAddr, err = c.getLatestProposer()
if err != nil {
return nil, fmt.Errorf("get latest proposer: %w", err)
}
} else {
return nil, fmt.Errorf("get batch at height: %w", err)
}
} else {
proposerAddr = res.Batch.Sequencer
}
return err
})
if err != nil {
c.logger.Error("GetProposer", "error", err)
return nil
}

// find the sequencer with the proposer address
index := slices.IndexFunc(seqs, func(seq types.Sequencer) bool {
return seq.SettlementAddress == proposerAddr
})
// will return nil if the proposer is not set
if index == -1 {
return nil
if proposerAddr == "" {
return nil, fmt.Errorf("proposer address is empty")
}
c.proposer = seqs[index]
return &seqs[index]

// Find and return the matching sequencer
for _, seq := range seqs {
if seq.SettlementAddress == proposerAddr {
return &seq, nil
}
}
return nil, fmt.Errorf("proposer not found")
}

// GetSequencerByAddress returns a sequencer by its address.
Expand Down Expand Up @@ -642,3 +644,25 @@ func (c *Client) pollForBatchInclusion(batchEndHeight uint64) (bool, error) {

return latestBatch.Batch.EndHeight == batchEndHeight, nil
}

func (c *Client) getLatestProposer() (string, error) {
var proposerAddr string
err := c.RunWithRetry(func() error {
reqProposer := &sequencertypes.QueryGetProposerByRollappRequest{
RollappId: c.rollappId,
}
res, err := c.sequencerQueryClient.GetProposerByRollapp(c.ctx, reqProposer)
if err == nil {
proposerAddr = res.ProposerAddr
return nil
}
if status.Code(err) == codes.NotFound {
return nil
}
return err
})
if err != nil {
return "", err
}
return proposerAddr, nil
}
28 changes: 18 additions & 10 deletions settlement/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"strconv"
"sync/atomic"
Expand Down Expand Up @@ -242,29 +243,27 @@ func (c *Client) GetBatchAtHeight(h uint64) (*settlement.ResultRetrieveBatch, er
return nil, gerrc.ErrNotFound
}

// GetProposer implements settlement.ClientI.
func (c *Client) GetProposer() *types.Sequencer {
// GetProposerAtHeight implements settlement.ClientI.
func (c *Client) GetProposerAtHeight(height int64) (*types.Sequencer, error) {
pubKeyBytes, err := hex.DecodeString(c.ProposerPubKey)
if err != nil {
return nil
return nil, fmt.Errorf("decode proposer pubkey: %w", err)
}
var pubKey cryptotypes.PubKey = &ed25519.PubKey{Key: pubKeyBytes}
tmPubKey, err := cryptocodec.ToTmPubKeyInterface(pubKey)
if err != nil {
c.logger.Error("Error converting to tendermint pubkey", "err", err)
return nil
return nil, fmt.Errorf("convert to tendermint pubkey: %w", err)
}
settlementAddr, err := bech32.ConvertAndEncode(addressPrefix, pubKeyBytes)
if err != nil {
c.logger.Error("Error converting pubkey to settlement address", "err", err)
return nil
return nil, fmt.Errorf("convert pubkey to settlement address: %w", err)
}
return types.NewSequencer(
tmPubKey,
settlementAddr,
settlementAddr,
[]string{},
)
), nil
}

// GetSequencerByAddress returns all sequencer information by its address. Not implemented since it will not be used in grpc SL
Expand All @@ -279,7 +278,11 @@ func (c *Client) GetAllSequencers() ([]types.Sequencer, error) {

// GetBondedSequencers implements settlement.ClientI.
func (c *Client) GetBondedSequencers() ([]types.Sequencer, error) {
return []types.Sequencer{*c.GetProposer()}, nil
proposer, err := c.GetProposerAtHeight(-1)
if err != nil {
return nil, fmt.Errorf("get proposer at height: %w", err)
}
return []types.Sequencer{*proposer}, nil
}

// GetNextProposer implements settlement.ClientI.
Expand Down Expand Up @@ -337,8 +340,13 @@ func (c *Client) convertBatchtoSettlementBatch(batch *types.Batch, daResult *da.
bds = append(bds, bd)
}

proposer, err := c.GetProposerAtHeight(0)
if err != nil {
panic(err)
}

settlementBatch := &settlement.Batch{
Sequencer: c.GetProposer().SettlementAddress,
Sequencer: proposer.SettlementAddress,
StartHeight: batch.StartHeight(),
EndHeight: batch.EndHeight(),
MetaData: &settlement.BatchMetaData{
Expand Down
Loading

0 comments on commit 5878792

Please # to comment.