From 587879246554250031ae304d7e47884763144074 Mon Sep 17 00:00:00 2001 From: Omri Date: Mon, 4 Nov 2024 10:10:30 +0100 Subject: [PATCH] fix(sync): fixed proposer by height syncing issue on startup and rotation (#1189) Co-authored-by: Sergi Rene --- block/initchain.go | 8 +- block/manager.go | 27 ++++-- block/modes.go | 4 + block/pruning.go | 3 +- block/sequencers.go | 27 +++--- .../dymint/settlement/mock_ClientI.go | 15 ++- settlement/dymension/dymension.go | 94 ++++++++++++------- settlement/grpc/grpc.go | 28 ++++-- settlement/local/local.go | 26 +++-- settlement/local/local_test.go | 3 +- settlement/settlement.go | 4 +- 11 files changed, 149 insertions(+), 90 deletions(-) diff --git a/block/initchain.go b/block/initchain.go index 926724633..9e043ec01 100644 --- a/block/initchain.go +++ b/block/initchain.go @@ -3,13 +3,17 @@ package block import ( "context" "errors" + "fmt" tmtypes "github.com/tendermint/tendermint/types" ) func (m *Manager) RunInitChain(ctx context.Context) error { - // FIXME: We want to get the initial proposer and not current one - proposer := m.SLClient.GetProposer() + // Get the proposer at the initial height. If we're at genesis the height will be 0. + proposer, err := m.SLClient.GetProposerAtHeight(int64(m.State.Height()) + 1) + if err != nil { + return fmt.Errorf("get proposer at height: %w", err) + } if proposer == nil { return errors.New("failed to get proposer") } diff --git a/block/manager.go b/block/manager.go index 6c46dffbb..fdd9133a6 100644 --- a/block/manager.go +++ b/block/manager.go @@ -34,6 +34,13 @@ import ( uchannel "github.com/dymensionxyz/dymint/utils/channel" ) +const ( + // RunModeProposer represents a node running as a proposer + RunModeProposer uint = iota + // RunModeFullNode represents a node running as a full node + RunModeFullNode +) + // Manager is responsible for aggregating transactions into blocks. type Manager struct { logger types.Logger @@ -55,6 +62,9 @@ type Manager struct { DAClient da.DataAvailabilityLayerClient SLClient settlement.ClientI + // RunMode represents the mode of the node. Set during initialization and shouldn't change after that. + RunMode uint + /* Sequencer and full-node */ @@ -194,12 +204,11 @@ func (m *Manager) Start(ctx context.Context) error { // Check if a proposer on the rollapp is set. In case no proposer is set on the Rollapp, fallback to the hub proposer (If such exists). // No proposer on the rollapp means that at some point there was no available proposer. // In case there is also no proposer on the hub to our current height, it means that the chain is halted. - // FIXME: In case we are syncing we would like to get the proposer from the hub relevant to the current height. if m.State.GetProposer() == nil { m.logger.Info("No proposer on the rollapp, fallback to the hub proposer, if available") - SLProposer := m.SLClient.GetProposer() - if SLProposer == nil { - return fmt.Errorf("no proposer available. chain is halted") + SLProposer, err := m.SLClient.GetProposerAtHeight(int64(m.State.Height())) + if err != nil { + return fmt.Errorf("get proposer at height: %w", err) } m.State.SetProposer(SLProposer) } @@ -208,12 +217,14 @@ func (m *Manager) Start(ctx context.Context) error { // In case of sequencer rotation, there's a phase where proposer rotated on Rollapp but hasn't yet rotated on hub. // for this case, 2 nodes will get `true` for `AmIProposer` so the l2 proposer can produce blocks and the hub proposer can submit his last batch. // The hub proposer, after sending the last state update, will panic and restart as full node. - amIProposer := m.AmIProposerOnSL() || m.AmIProposerOnRollapp() - - m.logger.Info("starting block manager", "mode", map[bool]string{true: "proposer", false: "full node"}[amIProposer]) + amIProposerOnSL, err := m.AmIProposerOnSL() + if err != nil { + return fmt.Errorf("am i proposer on SL: %w", err) + } + amIProposer := amIProposerOnSL || m.AmIProposerOnRollapp() // update local state from latest state in settlement - err := m.updateFromLastSettlementState() + err = m.updateFromLastSettlementState() if err != nil { return fmt.Errorf("sync block manager from settlement: %w", err) } diff --git a/block/modes.go b/block/modes.go index 1c8c3fd60..08f415b2a 100644 --- a/block/modes.go +++ b/block/modes.go @@ -13,6 +13,8 @@ import ( // setFraudHandler sets the fraud handler for the block manager. func (m *Manager) runAsFullNode(ctx context.Context, eg *errgroup.Group) error { + m.logger.Info("starting block manager", "mode", "full node") + m.RunMode = RunModeFullNode // update latest finalized height err := m.updateLastFinalizedHeightFromSettlement() if err != nil { @@ -36,6 +38,8 @@ func (m *Manager) runAsFullNode(ctx context.Context, eg *errgroup.Group) error { } func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error { + m.logger.Info("starting block manager", "mode", "proposer") + m.RunMode = RunModeProposer // Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settlement and by the time we query the last batch, this batch wasn't accepted yet. go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger) diff --git a/block/pruning.go b/block/pruning.go index 06c2ce4c5..8781cc1ed 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -42,9 +42,8 @@ func (m *Manager) PruningLoop(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case retainHeight := <-m.pruningC: - var pruningHeight uint64 - if m.AmIProposerOnSL() || m.AmIProposerOnRollapp() { // do not delete anything that we might submit in future + if m.RunMode == RunModeProposer { // do not delete anything that we might submit in future pruningHeight = min(m.NextHeightToSubmit(), uint64(retainHeight)) } else { // do not delete anything that is not validated yet pruningHeight = min(m.SettlementValidator.NextValidationHeight(), uint64(retainHeight)) diff --git a/block/sequencers.go b/block/sequencers.go index a32db3865..e560e5c14 100644 --- a/block/sequencers.go +++ b/block/sequencers.go @@ -48,25 +48,16 @@ func (m *Manager) MonitorSequencerSetUpdates(ctx context.Context) error { } } -// AmIPropoesr checks if the the current node is the proposer either on L2 or on the hub. -// In case of sequencer rotation, there's a phase where proposer rotated on L2 but hasn't yet rotated on hub. -// for this case, 2 nodes will get `true` for `AmIProposer` so the l2 proposer can produce blocks and the hub proposer can submit his last batch. -func (m *Manager) AmIProposer() bool { - return m.AmIProposerOnSL() || m.AmIProposerOnRollapp() -} - // AmIProposerOnSL checks if the current node is the proposer on the hub -// Proposer on the Hub is not necessarily the proposer on the L2 during rotation phase. -func (m *Manager) AmIProposerOnSL() bool { +// Proposer on the Hub is not necessarily the proposer on the Rollapp during rotation phase. +func (m *Manager) AmIProposerOnSL() (bool, error) { localProposerKeyBytes, _ := m.LocalKey.GetPublic().Raw() - // get hub proposer key - var hubProposerKeyBytes []byte - hubProposer := m.SLClient.GetProposer() - if hubProposer != nil { - hubProposerKeyBytes = hubProposer.PubKey().Bytes() + SLProposer, err := m.SLClient.GetProposerAtHeight(-1) + if err != nil { + return false, fmt.Errorf("get proposer at height: %w", err) } - return bytes.Equal(hubProposerKeyBytes, localProposerKeyBytes) + return bytes.Equal(SLProposer.PubKey().Bytes(), localProposerKeyBytes), nil } // AmIProposerOnRollapp checks if the current node is the proposer on the rollapp. @@ -90,7 +81,11 @@ func (m *Manager) ShouldRotate() (bool, error) { } // At this point we know that there is a next proposer, // so we should rotate only if we are the current proposer on the hub - return m.AmIProposerOnSL(), nil + amIProposerOnSL, err := m.AmIProposerOnSL() + if err != nil { + return false, fmt.Errorf("am i proposer on SL: %w", err) + } + return amIProposerOnSL, nil } // rotate rotates current proposer by doing the following: diff --git a/mocks/github.com/dymensionxyz/dymint/settlement/mock_ClientI.go b/mocks/github.com/dymensionxyz/dymint/settlement/mock_ClientI.go index 3e3bcc9b6..8aae5c592 100644 --- a/mocks/github.com/dymensionxyz/dymint/settlement/mock_ClientI.go +++ b/mocks/github.com/dymensionxyz/dymint/settlement/mock_ClientI.go @@ -481,11 +481,11 @@ func (_c *MockClientI_GetLatestHeight_Call) RunAndReturn(run func() (uint64, err } // GetProposer provides a mock function with given fields: -func (_m *MockClientI) GetProposer() *types.Sequencer { - ret := _m.Called() +func (_m *MockClientI) GetProposerAtHeight(height int64) (*types.Sequencer, error) { + ret := _m.Called(height) if len(ret) == 0 { - panic("no return value specified for GetProposer") + panic("no return value specified for GetProposerAtHeight") } var r0 *types.Sequencer @@ -497,7 +497,14 @@ func (_m *MockClientI) GetProposer() *types.Sequencer { } } - return r0 + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 } // MockClientI_GetProposer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetProposer' diff --git a/settlement/dymension/dymension.go b/settlement/dymension/dymension.go index 6f9820d62..58c19ac9c 100644 --- a/settlement/dymension/dymension.go +++ b/settlement/dymension/dymension.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "slices" "strconv" "strings" "time" @@ -50,7 +49,6 @@ type Client struct { rollappQueryClient rollapptypes.QueryClient sequencerQueryClient sequencertypes.QueryClient protoCodec *codec.ProtoCodec - proposer types.Sequencer retryAttempts uint retryMinDelay time.Duration retryMaxDelay time.Duration @@ -312,49 +310,53 @@ func (c *Client) GetLatestFinalizedHeight() (uint64, error) { return res.Height, nil } -// GetProposer implements settlement.ClientI. -func (c *Client) GetProposer() *types.Sequencer { - // return cached proposer - if !c.proposer.IsEmpty() { - return &c.proposer - } - +// GetProposerAtHeight return the proposer at height. +// In case of negative height, it will return the latest proposer. +func (c *Client) GetProposerAtHeight(height int64) (*types.Sequencer, error) { + // Get all sequencers to find the proposer address seqs, err := c.GetBondedSequencers() if err != nil { - c.logger.Error("GetBondedSequencers", "error", err) - return nil + return nil, fmt.Errorf("get bonded sequencers: %w", err) } + // Get either latest proposer or proposer at height var proposerAddr string - err = c.RunWithRetry(func() error { - reqProposer := &sequencertypes.QueryGetProposerByRollappRequest{ - RollappId: c.rollappId, - } - res, err := c.sequencerQueryClient.GetProposerByRollapp(c.ctx, reqProposer) - if err == nil { - proposerAddr = res.ProposerAddr - return nil + if height < 0 { + proposerAddr, err = c.getLatestProposer() + if err != nil { + return nil, fmt.Errorf("get latest proposer: %w", err) } - if status.Code(err) == codes.NotFound { - return nil + } else { + // Get the state info for the relevant height and get address from there + res, err := c.GetBatchAtHeight(uint64(height)) + // if case of height not found, it may be because it didn't arrive to the hub yet. + // In that case we want to return the current proposer. + if err != nil { + // If batch not found, fallback to latest proposer + if errors.Is(err, gerrc.ErrNotFound) { + proposerAddr, err = c.getLatestProposer() + if err != nil { + return nil, fmt.Errorf("get latest proposer: %w", err) + } + } else { + return nil, fmt.Errorf("get batch at height: %w", err) + } + } else { + proposerAddr = res.Batch.Sequencer } - return err - }) - if err != nil { - c.logger.Error("GetProposer", "error", err) - return nil } - // find the sequencer with the proposer address - index := slices.IndexFunc(seqs, func(seq types.Sequencer) bool { - return seq.SettlementAddress == proposerAddr - }) - // will return nil if the proposer is not set - if index == -1 { - return nil + if proposerAddr == "" { + return nil, fmt.Errorf("proposer address is empty") } - c.proposer = seqs[index] - return &seqs[index] + + // Find and return the matching sequencer + for _, seq := range seqs { + if seq.SettlementAddress == proposerAddr { + return &seq, nil + } + } + return nil, fmt.Errorf("proposer not found") } // GetSequencerByAddress returns a sequencer by its address. @@ -642,3 +644,25 @@ func (c *Client) pollForBatchInclusion(batchEndHeight uint64) (bool, error) { return latestBatch.Batch.EndHeight == batchEndHeight, nil } + +func (c *Client) getLatestProposer() (string, error) { + var proposerAddr string + err := c.RunWithRetry(func() error { + reqProposer := &sequencertypes.QueryGetProposerByRollappRequest{ + RollappId: c.rollappId, + } + res, err := c.sequencerQueryClient.GetProposerByRollapp(c.ctx, reqProposer) + if err == nil { + proposerAddr = res.ProposerAddr + return nil + } + if status.Code(err) == codes.NotFound { + return nil + } + return err + }) + if err != nil { + return "", err + } + return proposerAddr, nil +} diff --git a/settlement/grpc/grpc.go b/settlement/grpc/grpc.go index 367cff312..f10437ec2 100644 --- a/settlement/grpc/grpc.go +++ b/settlement/grpc/grpc.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "encoding/json" "errors" + "fmt" "path/filepath" "strconv" "sync/atomic" @@ -242,29 +243,27 @@ func (c *Client) GetBatchAtHeight(h uint64) (*settlement.ResultRetrieveBatch, er return nil, gerrc.ErrNotFound } -// GetProposer implements settlement.ClientI. -func (c *Client) GetProposer() *types.Sequencer { +// GetProposerAtHeight implements settlement.ClientI. +func (c *Client) GetProposerAtHeight(height int64) (*types.Sequencer, error) { pubKeyBytes, err := hex.DecodeString(c.ProposerPubKey) if err != nil { - return nil + return nil, fmt.Errorf("decode proposer pubkey: %w", err) } var pubKey cryptotypes.PubKey = &ed25519.PubKey{Key: pubKeyBytes} tmPubKey, err := cryptocodec.ToTmPubKeyInterface(pubKey) if err != nil { - c.logger.Error("Error converting to tendermint pubkey", "err", err) - return nil + return nil, fmt.Errorf("convert to tendermint pubkey: %w", err) } settlementAddr, err := bech32.ConvertAndEncode(addressPrefix, pubKeyBytes) if err != nil { - c.logger.Error("Error converting pubkey to settlement address", "err", err) - return nil + return nil, fmt.Errorf("convert pubkey to settlement address: %w", err) } return types.NewSequencer( tmPubKey, settlementAddr, settlementAddr, []string{}, - ) + ), nil } // GetSequencerByAddress returns all sequencer information by its address. Not implemented since it will not be used in grpc SL @@ -279,7 +278,11 @@ func (c *Client) GetAllSequencers() ([]types.Sequencer, error) { // GetBondedSequencers implements settlement.ClientI. func (c *Client) GetBondedSequencers() ([]types.Sequencer, error) { - return []types.Sequencer{*c.GetProposer()}, nil + proposer, err := c.GetProposerAtHeight(-1) + if err != nil { + return nil, fmt.Errorf("get proposer at height: %w", err) + } + return []types.Sequencer{*proposer}, nil } // GetNextProposer implements settlement.ClientI. @@ -337,8 +340,13 @@ func (c *Client) convertBatchtoSettlementBatch(batch *types.Batch, daResult *da. bds = append(bds, bd) } + proposer, err := c.GetProposerAtHeight(0) + if err != nil { + panic(err) + } + settlementBatch := &settlement.Batch{ - Sequencer: c.GetProposer().SettlementAddress, + Sequencer: proposer.SettlementAddress, StartHeight: batch.StartHeight(), EndHeight: batch.EndHeight(), MetaData: &settlement.BatchMetaData{ diff --git a/settlement/local/local.go b/settlement/local/local.go index 3de07099d..989bac2d2 100644 --- a/settlement/local/local.go +++ b/settlement/local/local.go @@ -203,29 +203,27 @@ func (c *Client) GetBatchAtHeight(h uint64) (*settlement.ResultRetrieveBatch, er return nil, gerrc.ErrNotFound // TODO: need to return a cosmos specific error? } -// GetProposer implements settlement.ClientI. -func (c *Client) GetProposer() *types.Sequencer { +// GetProposerAtHeight implements settlement.ClientI. +func (c *Client) GetProposerAtHeight(height int64) (*types.Sequencer, error) { pubKeyBytes, err := hex.DecodeString(c.ProposerPubKey) if err != nil { - return nil + return nil, fmt.Errorf("decode proposer pubkey: %w", err) } var pubKey cryptotypes.PubKey = &ed25519.PubKey{Key: pubKeyBytes} tmPubKey, err := cryptocodec.ToTmPubKeyInterface(pubKey) if err != nil { - c.logger.Error("Error converting to tendermint pubkey", "err", err) - return nil + return nil, fmt.Errorf("convert to tendermint pubkey: %w", err) } settlementAddr, err := bech32.ConvertAndEncode(addressPrefix, pubKeyBytes) if err != nil { - c.logger.Error("Error converting pubkey to settlement address", "err", err) - return nil + return nil, fmt.Errorf("convert pubkey to settlement address: %w", err) } return types.NewSequencer( tmPubKey, settlementAddr, settlementAddr, []string{}, - ) + ), nil } // GetSequencerByAddress returns all sequencer information by its address. Not implemented since it will not be used in mock SL @@ -240,7 +238,11 @@ func (c *Client) GetAllSequencers() ([]types.Sequencer, error) { // GetBondedSequencers implements settlement.ClientI. func (c *Client) GetBondedSequencers() ([]types.Sequencer, error) { - return []types.Sequencer{*c.GetProposer()}, nil + proposer, err := c.GetProposerAtHeight(-1) + if err != nil { + return nil, fmt.Errorf("get proposer at height: %w", err) + } + return []types.Sequencer{*proposer}, nil } // GetNextProposer implements settlement.ClientI. @@ -303,9 +305,13 @@ func (c *Client) convertBatchToSettlementBatch(batch *types.Batch, daResult *da. } bds = append(bds, bd) } + proposer, err := c.GetProposerAtHeight(-1) + if err != nil { + panic(err) + } settlementBatch := &settlement.Batch{ - Sequencer: c.GetProposer().SettlementAddress, + Sequencer: proposer.SettlementAddress, StartHeight: batch.StartHeight(), EndHeight: batch.EndHeight(), MetaData: &settlement.BatchMetaData{ diff --git a/settlement/local/local_test.go b/settlement/local/local_test.go index bca3a9b7e..6a1c5deb8 100644 --- a/settlement/local/local_test.go +++ b/settlement/local/local_test.go @@ -36,7 +36,8 @@ func TestGetSequencers(t *testing.T) { assert.Equal(1, len(sequencers)) assert.Equal(pubKeybytes, sequencers[0].PubKey().Bytes()) - proposer := sllayer.GetProposer() + proposer, err := sllayer.GetProposerAtHeight(-1) + require.NoError(err) require.NotNil(proposer) assert.Equal(pubKeybytes, proposer.PubKey().Bytes()) } diff --git a/settlement/settlement.go b/settlement/settlement.go index 953b894f6..98d54bc85 100644 --- a/settlement/settlement.go +++ b/settlement/settlement.go @@ -89,8 +89,8 @@ type ClientI interface { GetAllSequencers() ([]types.Sequencer, error) // GetBondedSequencers returns the list of the bonded sequencers for this rollapp. GetBondedSequencers() ([]types.Sequencer, error) - // GetProposer returns the current proposer for this chain. - GetProposer() *types.Sequencer + // GetProposerAtHeight returns the current proposer for this chain. + GetProposerAtHeight(height int64) (*types.Sequencer, error) // GetNextProposer returns the next proposer for this chain in case of a rotation. // If no rotation is in progress, it should return nil. GetNextProposer() (*types.Sequencer, error)