Skip to content

Commit

Permalink
retry connection with backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
iansuvak committed Jan 24, 2025
1 parent 110508b commit 10ecb86
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 34 deletions.
2 changes: 0 additions & 2 deletions peers/app_request_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,7 @@ func (n *appRequestNetwork) ConnectToCanonicalValidators(subnetID ids.ID) (*Conn
}

peerInfo := n.network.PeerInfo(nil)

connectedPeers := set.NewSet[ids.NodeID](len(nodeIDs))

for _, peer := range peerInfo {
if nodeIDs.Contains(peer.ID) {
connectedPeers.Add(peer.ID)
Expand Down
71 changes: 39 additions & 32 deletions signature-aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ type blsSignatureBuf [bls.SignatureLen]byte
const (
// Maximum amount of time to spend waiting (in addition to network round trip time per attempt)
// during relayer signature query routine
signatureRequestTimeout = 20 * time.Second
signatureRequestTimeout = 5 * time.Second
// Maximum amount of time to spend waiting for a connection to a quorum of validators for
// a given subnetID
connectToValidatorsTimeout = 5 * time.Second
)

var (
Expand Down Expand Up @@ -118,39 +121,43 @@ func (s *SignatureAggregator) CreateSignedMessage(
zap.Stringer("signingSubnet", signingSubnet),
)

connectedValidators, err := s.network.ConnectToCanonicalValidators(signingSubnet)
if err != nil {
msg := "Failed to connect to canonical validators"
s.logger.Error(
msg,
zap.String("warpMessageID", unsignedMessage.ID().String()),
zap.Error(err),
)
s.metrics.FailuresToGetValidatorSet.Inc()
return nil, fmt.Errorf("%s: %w", msg, err)
}
s.logger.Debug("Connected to canonical validators", zap.String("warpMessageID", unsignedMessage.ID().String()))
s.metrics.ConnectedStakeWeightPercentage.WithLabelValues(
signingSubnet.String(),
).Set(
float64(connectedValidators.ConnectedWeight) /
float64(connectedValidators.TotalValidatorWeight) * 100,
)

if !utils.CheckStakeWeightExceedsThreshold(
big.NewInt(0).SetUint64(connectedValidators.ConnectedWeight),
connectedValidators.TotalValidatorWeight,
quorumPercentage,
) {
s.logger.Error(
"Failed to connect to a threshold of stake",
zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight),
zap.Uint64("totalValidatorWeight", connectedValidators.TotalValidatorWeight),
zap.Uint64("quorumPercentage", quorumPercentage),
var connectedValidators *peers.ConnectedCanonicalValidators
connectOp := func() error {
connectedValidators, err = s.network.ConnectToCanonicalValidators(signingSubnet)
if err != nil {
msg := "Failed to connect to canonical validators"
s.logger.Error(
msg,
zap.String("warpMessageID", unsignedMessage.ID().String()),
zap.Error(err),
)
s.metrics.FailuresToGetValidatorSet.Inc()
return fmt.Errorf("%s: %w", msg, err)
}
s.logger.Debug("Connected to canonical validators", zap.String("warpMessageID", unsignedMessage.ID().String()))
s.metrics.ConnectedStakeWeightPercentage.WithLabelValues(
signingSubnet.String(),
).Set(
float64(connectedValidators.ConnectedWeight) /
float64(connectedValidators.TotalValidatorWeight) * 100,
)
s.metrics.FailuresToConnectToSufficientStake.Inc()
return nil, ErrNotEnoughConnectedStake
if !utils.CheckStakeWeightExceedsThreshold(
big.NewInt(0).SetUint64(connectedValidators.ConnectedWeight),
connectedValidators.TotalValidatorWeight,
quorumPercentage,
) {
s.logger.Error(
"Failed to connect to a threshold of stake",
zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight),
zap.Uint64("totalValidatorWeight", connectedValidators.TotalValidatorWeight),
zap.Uint64("quorumPercentage", quorumPercentage),
)
s.metrics.FailuresToConnectToSufficientStake.Inc()
return ErrNotEnoughConnectedStake
}
return nil
}
err = utils.WithRetriesTimeout(s.logger, connectOp, signatureRequestTimeout)

accumulatedSignatureWeight := big.NewInt(0)

Expand Down

0 comments on commit 10ecb86

Please # to comment.