Skip to content

Commit

Permalink
Merge pull request Enhance Leader Rotation Logic to Address Edge Case…
Browse files Browse the repository at this point in the history
…s in Leader Selection #4798

Enhance Leader Rotation Logic to Address Edge Cases in Leader Selection
  • Loading branch information
sophoah authored Nov 27, 2024
2 parents 8172901 + d5f6f9a commit 6e7b891
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 12 deletions.
10 changes: 8 additions & 2 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,11 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int, defaultKey *bls.PublicK
return defaultKey
}
const blocksCountAliveness = 4
utils.Logger().Info().Msgf("[Rotating leader] epoch: %v rotation:%v external rotation %v", epoch.Uint64(), bc.Config().IsLeaderRotationInternalValidators(epoch), bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch))
utils.Logger().Info().Msgf("[Rotating leader] epoch: %v rotation:%v external rotation %v rotation v2: %v",
epoch.Uint64(),
bc.Config().IsLeaderRotationInternalValidators(epoch),
bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch),
bc.Config().IsLeaderRotationV2Epoch(epoch))
ss, err := bc.ReadShardState(epoch)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to read shard state")
Expand Down Expand Up @@ -758,7 +762,9 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int, defaultKey *bls.PublicK
)

for i := 0; i < len(committee.Slots); i++ {
if bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
if bc.Config().IsLeaderRotationV2Epoch(epoch) {
wasFound, next = consensus.decider.NthNextValidatorV2(committee.Slots, leader, offset)
} else if bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
wasFound, next = consensus.decider.NthNextValidator(committee.Slots, leader, offset)
} else {
wasFound, next = consensus.decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, offset)
Expand Down
219 changes: 211 additions & 8 deletions consensus/quorum/quorom_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package quorum

import (
"fmt"
"math/big"
"strings"
"testing"
"time"

bls_core "github.com/harmony-one/bls/ffi/go/bls"
harmony_bls "github.com/harmony-one/harmony/crypto/bls"
Expand Down Expand Up @@ -549,32 +551,233 @@ func TestInvalidAggregateSig(test *testing.T) {
}
}

func TestCIdentities_NthNextValidatorHmy(t *testing.T) {
address := []common.Address{
common.HexToAddress("0x1"),
common.HexToAddress("0x2"),
common.HexToAddress("0x3"),
func createTestCIdentities(numAddresses int, keysPerAddress int) (*cIdentities, shard.SlotList, []harmony_bls.PublicKeyWrapper) {
testAddresses := make([]common.Address, 0, numAddresses*numAddresses)
for i := int(0); i < numAddresses; i++ {
h := fmt.Sprintf("0x%040x", i)
addr := common.HexToAddress(h)
testAddresses = append(testAddresses, addr)
}
slots := shard.SlotList{}
list := []harmony_bls.PublicKeyWrapper{}
for i := 0; i < 3; i++ {
for j := 0; j < 3; j++ {
// generate slots and public keys
for i := 0; i < numAddresses; i++ {
for j := 0; j < keysPerAddress; j++ { // keys per address
blsKey := harmony_bls.RandPrivateKey()
wrapper := harmony_bls.PublicKeyWrapper{Object: blsKey.GetPublicKey()}
wrapper.Bytes.FromLibBLSPublicKey(wrapper.Object)

slots = append(slots, shard.Slot{
EcdsaAddress: address[i%3],
EcdsaAddress: testAddresses[i],
BLSPublicKey: wrapper.Bytes,
EffectiveStake: nil,
})
list = append(list, wrapper)
}
}
// initialize and update cIdentities
c := newCIdentities()
c.UpdateParticipants(list, []bls.PublicKeyWrapper{})
return c, slots, list
}

func TestCIdentities_NthNextValidatorHmy(t *testing.T) {
c, slots, list := createTestCIdentities(3, 3)

found, key := c.NthNextValidator(slots, &list[0], 1)
require.Equal(t, true, found)
// because we skip 3 keys of current validator
require.Equal(t, 3, c.IndexOf(key.Bytes))
}

func TestCIdentities_NthNextValidatorFailedEdgeCase1(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Logf("Recovered from panic as expected: %v", r)
} else {
t.Errorf("Expected a panic when next is 0, but no panic occurred")
}
}()

// create test identities and slots
c, slots, _ := createTestCIdentities(3, 3)

// create a public key wrapper that doesn't exist in the identities
t.Log("creating a random public key wrapper not present in test identities")
blsKey := harmony_bls.RandPrivateKey()
wrapper := harmony_bls.PublicKeyWrapper{Object: blsKey.GetPublicKey()}

// Edge Case: Trigger NthNextValidator with next=0, which should cause a panic
t.Log("Calling NthNextValidator with next=0 to test panic handling")
c.NthNextValidator(slots, &wrapper, 0)
}

func TestCIdentities_NthNextValidatorFailedEdgeCase2(t *testing.T) {
// create test identities and slots
c, slots, list := createTestCIdentities(1, 3)

done := make(chan bool)

go func() {
// possible infinite loop, it will time out
c.NthNextValidator(slots, &list[1], 1)

done <- true
}()

select {
case <-done:
t.Error("Expected a timeout, but successfully calculated next leader")

case <-time.After(5 * time.Second):
t.Log("Test timed out, possible infinite loop")
}
}

func TestCIdentities_NthNextValidatorFailedEdgeCase3(t *testing.T) {
// create 3 test addresses
testAddresses := make([]common.Address, 0, 3)
for i := int(0); i < 3; i++ {
h := fmt.Sprintf("0x%040x", i)
addr := common.HexToAddress(h)
testAddresses = append(testAddresses, addr)
}
slots := shard.SlotList{}
list := []harmony_bls.PublicKeyWrapper{}

// First add 4 keys for first address
for i := 0; i < 4; i++ {
blsKey := harmony_bls.RandPrivateKey()
wrapper := harmony_bls.PublicKeyWrapper{Object: blsKey.GetPublicKey()}
wrapper.Bytes.FromLibBLSPublicKey(wrapper.Object)

slots = append(slots, shard.Slot{
EcdsaAddress: testAddresses[0],
BLSPublicKey: wrapper.Bytes,
EffectiveStake: nil,
})
list = append(list, wrapper)
}

// Then add 1 key for next two addresses
for i := 1; i < 3; i++ {
blsKey := harmony_bls.RandPrivateKey()
wrapper := harmony_bls.PublicKeyWrapper{Object: blsKey.GetPublicKey()}
wrapper.Bytes.FromLibBLSPublicKey(wrapper.Object)

slots = append(slots, shard.Slot{
EcdsaAddress: testAddresses[i],
BLSPublicKey: wrapper.Bytes,
EffectiveStake: nil,
})
list = append(list, wrapper)
}

// initialize and update cIdentities
c := newCIdentities()
c.UpdateParticipants(list, []bls.PublicKeyWrapper{})

// current key is the first one.
found, key := c.NthNextValidator(slots, &list[0], 1)
require.Equal(t, true, found)
// because we skip 4 keys of first validator, the next validator key index is 4 (starts from 0)
// but it returns 5 and skips second validator (key index: 4)
require.Equal(t, 5, c.IndexOf(key.Bytes))
t.Log("second validator were skipped")
}

func TestCIdentities_NthNextValidatorV2Hmy(t *testing.T) {
c, slots, list := createTestCIdentities(3, 3)

found, key := c.NthNextValidatorV2(slots, &list[0], 1)
require.Equal(t, true, found)
// because we skip 3 keys of current validator
require.Equal(t, 3, c.IndexOf(key.Bytes))
}

func TestCIdentities_NthNextValidatorV2EdgeCase1(t *testing.T) {
// create test identities and slots
c, slots, _ := createTestCIdentities(3, 3)

// create a public key wrapper that doesn't exist in the identities
t.Log("creating a random public key wrapper not present in test identities")
blsKey := harmony_bls.RandPrivateKey()
wrapper := harmony_bls.PublicKeyWrapper{Object: blsKey.GetPublicKey()}

// Edge Case: Trigger NthNextValidator with next=0, which should cause a panic
t.Log("Calling NthNextValidatorV2 with next=0 to test panic handling")
found, key := c.NthNextValidatorV2(slots, &wrapper, 0)

require.Equal(t, true, found)
require.Equal(t, 0, c.IndexOf(key.Bytes))
}

func TestCIdentities_NthNextValidatorV2EdgeCase2(t *testing.T) {
// create test identities and slots
c, slots, list := createTestCIdentities(1, 3)

done := make(chan bool)

go func() {
c.NthNextValidatorV2(slots, &list[1], 1)

done <- true
}()

select {
case <-done:
t.Log("Test completed successfully ")
case <-time.After(5 * time.Second):
t.Error("timeout, possible infinite loop")
}
}

func TestCIdentities_NthNextValidatorV2EdgeCase3(t *testing.T) {
// create 3 test addresses
testAddresses := make([]common.Address, 0, 3)
for i := int(0); i < 3; i++ {
h := fmt.Sprintf("0x%040x", i)
addr := common.HexToAddress(h)
testAddresses = append(testAddresses, addr)
}
slots := shard.SlotList{}
list := []harmony_bls.PublicKeyWrapper{}

// First add 4 keys for first address
for i := 0; i < 4; i++ {
blsKey := harmony_bls.RandPrivateKey()
wrapper := harmony_bls.PublicKeyWrapper{Object: blsKey.GetPublicKey()}
wrapper.Bytes.FromLibBLSPublicKey(wrapper.Object)

slots = append(slots, shard.Slot{
EcdsaAddress: testAddresses[0],
BLSPublicKey: wrapper.Bytes,
EffectiveStake: nil,
})
list = append(list, wrapper)
}

// Then add 1 key for next two addresses
for i := 1; i < 3; i++ {
blsKey := harmony_bls.RandPrivateKey()
wrapper := harmony_bls.PublicKeyWrapper{Object: blsKey.GetPublicKey()}
wrapper.Bytes.FromLibBLSPublicKey(wrapper.Object)

slots = append(slots, shard.Slot{
EcdsaAddress: testAddresses[i],
BLSPublicKey: wrapper.Bytes,
EffectiveStake: nil,
})
list = append(list, wrapper)
}

// initialize and update cIdentities
c := newCIdentities()
c.UpdateParticipants(list, []bls.PublicKeyWrapper{})

// current key is the first one.
found, key := c.NthNextValidatorV2(slots, &list[0], 1)
require.Equal(t, true, found)
// because we skip 4 keys of first validator, the next validator key index is 4 (starts from 0)
require.Equal(t, 4, c.IndexOf(key.Bytes))
}
42 changes: 41 additions & 1 deletion consensus/quorum/quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type ParticipantTracker interface {
ParticipantsCount() int64
// NthNextValidator returns key for next validator. It assumes external validators and leader rotation.
NthNextValidator(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper)
NthNextValidatorV2(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper)
NthNextHmy(instance shardingconfig.Instance, pubkey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper)
FirstParticipant(shardingconfig.Instance) *bls.PublicKeyWrapper
UpdateParticipants(pubKeys, allowlist []bls.PublicKeyWrapper)
Expand Down Expand Up @@ -217,7 +218,46 @@ func (s *cIdentities) NthNext(pubKey *bls.PublicKeyWrapper, next int) (bool, *bl
return found, &s.publicKeys[idx]
}

// NthNextValidator return the Nth next pubkey nodes, but from another validator.
// NthNextValidatorV2 returns the Nth next pubkey nodes, but from another validator.
func (s *cIdentities) NthNextValidatorV2(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
if len(s.publicKeys) == 0 || next < 0 {
return false, pubKey
}

publicToAddress := make(map[bls.SerializedPublicKey]common.Address, len(slotList))
for _, slot := range slotList {
publicToAddress[slot.BLSPublicKey] = slot.EcdsaAddress
}

pubKeyIndex := s.IndexOf(pubKey.Bytes)
if pubKeyIndex == -1 {
utils.Logger().Error().
Str("key", pubKey.Bytes.Hex()).
Msg("[NthNextValidator] pubKey not found")
}

if pubKeyIndex == -1 && next == 0 {
return true, &s.publicKeys[0]
}

numKeys := len(s.publicKeys)
attempts := 0

for {
idx := (pubKeyIndex + attempts + next) % numKeys
if attempts > numKeys {
utils.Logger().Warn().
Str("key", pubKey.Bytes.Hex()).
Msg("[NthNextValidator] Could not find a different validator within limit")
return false, pubKey
}
if publicToAddress[s.publicKeys[idx].Bytes] != publicToAddress[pubKey.Bytes] {
return true, &s.publicKeys[idx]
}
attempts++
}
}

func (s *cIdentities) NthNextValidator(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
found := false

Expand Down
6 changes: 6 additions & 0 deletions consensus/quorum/thread_safe_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ func (a threadSafeDeciderImpl) NthNextValidator(slotList shard.SlotList, pubKey
return a.decider.NthNextValidator(slotList, pubKey, next)
}

func (a threadSafeDeciderImpl) NthNextValidatorV2(slotList shard.SlotList, pubKey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
a.mu.Lock()
defer a.mu.Unlock()
return a.decider.NthNextValidator(slotList, pubKey, next)
}

func (a threadSafeDeciderImpl) NthNextHmy(instance shardingconfig.Instance, pubkey *bls.PublicKeyWrapper, next int) (bool, *bls.PublicKeyWrapper) {
a.mu.Lock()
defer a.mu.Unlock()
Expand Down
7 changes: 6 additions & 1 deletion consensus/view_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,12 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com
var wasFound bool
var next *bls.PublicKeyWrapper
if blockchain != nil && blockchain.Config().IsLeaderRotationInternalValidators(epoch) {
if blockchain.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
if blockchain.Config().IsLeaderRotationV2Epoch(epoch) {
wasFound, next = consensus.decider.NthNextValidatorV2(
committee.Slots,
lastLeaderPubKey,
gap)
} else if blockchain.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
wasFound, next = consensus.decider.NthNextValidator(
committee.Slots,
lastLeaderPubKey,
Expand Down
Loading

0 comments on commit 6e7b891

Please # to comment.