Skip to content

Commit

Permalink
v0.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
cam-schultz committed Aug 25, 2023
1 parent 2060ab6 commit a83a29e
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 330 deletions.
10 changes: 4 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ Build the relayer by running the included build script:
```
./scripts/build.sh
```

Build a Docker image by running the included build script:
```
./scripts/build-local-image.sh
```
### Running
The relayer binary accepts a path to a JSON configuration file as the sole argument. Command line configuration arguments are not currently supported.
```
Expand All @@ -22,7 +17,7 @@ The relayer binary accepts a path to a JSON configuration file as the sole argum

## Architecture
---
**Note:** The relayer in its current state supports Teleporter messages between `subnet-evm` instances. A handful of abstractions have been added to make the relayer extensible to other Warp message formats and VM types, but this work is ongoing.
**Note:** The relayer in its current state supports Teleporter messages between subnet-evm instances. A handful of abstractions have been added to make the relayer extensible to other Warp message formats and VM types, but this work is ongoing.
### Components
The relayer consists of the following components:
- At the global level:
Expand All @@ -37,3 +32,6 @@ The relayer consists of the following components:
<div align="center">
<img src="resources/relayer-diagram.png?raw=true">
</div>

## v0.1.x versus v0.2.x
`v0.2.x` tracks `main` and represents the latest version of `awm-relayer`, which stays up-to-date with `avalanchego` and `subnet-evm`. However, while Avalanche Warp Messaging is still under active development, `v0.1.x` uses stable versions of `avalanchego` and `subnet-evm` to avoid breaking changes. The `awm-relayer` deployment managed by Ava Labs, along with the Fuji subnets Amplify, Bulletin, and Conduit, are compatible with the stable `v0.1.x` but **not** `v0.2.x`. Any interaction with those subnets should be done with the correct version. Interaction with local networks (such as in the integration test suite in `teleporter`) may use `v0.2.x`.
21 changes: 6 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ module github.com/ava-labs/awm-relayer
go 1.18

require (
github.com/ava-labs/avalanchego v1.10.8
github.com/ava-labs/subnet-evm v0.5.4-0.20230816140336-9f21235c17b3
github.com/ethereum/go-ethereum v1.12.0
github.com/ava-labs/avalanchego v1.10.2
github.com/ava-labs/subnet-evm v0.5.2-warp-rc.0
github.com/ethereum/go-ethereum v1.10.26
github.com/golang/mock v1.6.0
github.com/prometheus/client_golang v1.16.0
github.com/spf13/pflag v1.0.5
Expand All @@ -15,23 +15,14 @@ require (
)

require (
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/deckarep/golang-set v1.8.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/rjeczalik/notify v0.9.3 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
Expand Down
220 changes: 14 additions & 206 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions messages/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
// for each message protocol, and performs the sending to the destination chain.
type MessageManager interface {
// ShouldSendMessage returns true if the message should be sent to the destination chain
ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) (bool, error)
ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo) (bool, error)
// SendMessage sends the signed message to the destination chain. The payload parsed according to
// the VM rules is also passed in, since MessageManager does not assume any particular VM
SendMessage(signedMessage *warp.Message, parsedVmPayload []byte, destinationChainID ids.ID) error
SendMessage(signedMessage *warp.Message, parsedVmPayload []byte) error
}

// NewMessageManager constructs a MessageManager for a particular message protocol, defined by the message protocol address and config
Expand Down
7 changes: 5 additions & 2 deletions messages/teleporter/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewMessageManager(
}

// ShouldSendMessage returns true if the message should be sent to the destination chain
func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) (bool, error) {
func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo) (bool, error) {
// Unpack the teleporter message and add it to the cache
teleporterMessage, err := unpackTeleporterMessage(warpMessageInfo.WarpPayload)
if err != nil {
Expand All @@ -82,6 +82,7 @@ func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageI
return false, err
}

destinationChainID := warpMessageInfo.WarpUnsignedMessage.DestinationChainID
// Get the correct destination client from the global map
destinationClient, ok := m.destinationClients[destinationChainID]
if !ok {
Expand All @@ -101,7 +102,7 @@ func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageI

// SendMessage extracts the gasLimit and packs the call data to call the receiveCrossChainMessage method of the Teleporter contract,
// and dispatches transaction construction and broadcast to the destination client
func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayload []byte, destinationChainID ids.ID) error {
func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayload []byte) error {
var (
teleporterMessage *TeleporterMessage
ok bool
Expand All @@ -123,6 +124,8 @@ func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayloa
}
}

destinationChainID := signedMessage.UnsignedMessage.DestinationChainID

m.logger.Info(
"Sending message to destination chain",
zap.String("destinationChainID", destinationChainID.String()),
Expand Down
98 changes: 63 additions & 35 deletions relayer/message_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/vms/platformvm/signer"
"github.com/ava-labs/avalanchego/vms/platformvm/warp"
"github.com/ava-labs/awm-relayer/messages"
"github.com/ava-labs/awm-relayer/peers"
"github.com/ava-labs/awm-relayer/temp"
"github.com/ava-labs/awm-relayer/utils"
"github.com/ava-labs/awm-relayer/vms/vmtypes"
msg "github.com/ava-labs/subnet-evm/plugin/evm/message"
Expand Down Expand Up @@ -49,26 +51,23 @@ var (
type messageRelayer struct {
relayer *Relayer
warpMessage *warp.UnsignedMessage
destinationChainID ids.ID
messageResponseChan chan message.InboundMessage
logger logging.Logger
messageCreator message.Creator
metrics *MessageRelayerMetrics
messageCreator message.Creator
}

func newMessageRelayer(
logger logging.Logger,
metrics *MessageRelayerMetrics,
relayer *Relayer,
warpMessage *warp.UnsignedMessage,
destinationChainID ids.ID,
messageResponseChan chan message.InboundMessage,
messageCreator message.Creator,
) *messageRelayer {
return &messageRelayer{
relayer: relayer,
warpMessage: warpMessage,
destinationChainID: destinationChainID,
messageResponseChan: messageResponseChan,
logger: logger,
metrics: metrics,
Expand All @@ -77,7 +76,7 @@ func newMessageRelayer(
}

func (r *messageRelayer) run(warpMessageInfo *vmtypes.WarpMessageInfo, requestID uint32, messageManager messages.MessageManager) {
shouldSend, err := messageManager.ShouldSendMessage(warpMessageInfo, r.destinationChainID)
shouldSend, err := messageManager.ShouldSendMessage(warpMessageInfo)
if err != nil {
r.logger.Error(
"Failed to check if message should be sent",
Expand Down Expand Up @@ -110,7 +109,7 @@ func (r *messageRelayer) run(warpMessageInfo *vmtypes.WarpMessageInfo, requestID
// create signed message latency (ms)
r.setCreateSignedMessageLatencyMS(float64(time.Since(startCreateSignedMessageTime).Milliseconds()))

err = messageManager.SendMessage(signedMessage, warpMessageInfo.WarpPayload, r.destinationChainID)
err = messageManager.SendMessage(signedMessage, warpMessageInfo.WarpPayload)
if err != nil {
r.logger.Error(
"Failed to send warp message",
Expand All @@ -122,7 +121,7 @@ func (r *messageRelayer) run(warpMessageInfo *vmtypes.WarpMessageInfo, requestID
}
r.logger.Info(
"Finished relaying message to destination chain",
zap.String("destinationChainID", r.destinationChainID.String()),
zap.String("destinationChainID", r.warpMessage.DestinationChainID.String()),
)
r.incSuccessfulRelayMessageCount()
}
Expand All @@ -131,7 +130,7 @@ func (r *messageRelayer) run(warpMessageInfo *vmtypes.WarpMessageInfo, requestID
func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, error) {
r.logger.Info(
"Starting relayer routine",
zap.String("destinationChainID", r.destinationChainID.String()),
zap.String("destinationChainID", r.warpMessage.DestinationChainID.String()),
)

// Get the current canonical validator set of the source subnet.
Expand Down Expand Up @@ -182,7 +181,7 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e
if err != nil {
r.logger.Error(
"Failed to marshal request bytes",
zap.String("destinationChainID", r.destinationChainID.String()),
zap.String("destinationChainID", r.warpMessage.DestinationChainID.String()),
zap.Error(err),
)
return nil, err
Expand All @@ -207,7 +206,7 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e
r.logger.Debug(
"Relayer collecting signatures from peers.",
zap.Int("attempt", attempt),
zap.String("destinationChainID", r.destinationChainID.String()),
zap.String("destinationChainID", r.warpMessage.DestinationChainID.String()),
zap.Int("validatorSetSize", len(validatorSet)),
zap.Int("signatureMapSize", len(signatureMap)),
zap.Int("responsesExpected", responsesExpected),
Expand Down Expand Up @@ -318,7 +317,7 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e
if err != nil {
r.logger.Error(
"Failed to aggregate signature.",
zap.String("destinationChainID", r.destinationChainID.String()),
zap.String("destinationChainID", r.warpMessage.DestinationChainID.String()),
zap.Error(err),
)
return nil, err
Expand Down Expand Up @@ -347,7 +346,7 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e
if signedMsg != nil {
r.logger.Info(
"Created signed message.",
zap.String("destinationChainID", r.destinationChainID.String()),
zap.String("destinationChainID", r.warpMessage.DestinationChainID.String()),
)
return signedMsg, nil
}
Expand All @@ -367,23 +366,42 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e
r.logger.Warn(
"Failed to collect a threshold of signatures",
zap.Int("attempts", maxRelayerQueryAttempts),
zap.String("destinationChainID", r.destinationChainID.String()),
zap.String("destinationChainID", r.warpMessage.DestinationChainID.String()),
)
return nil, errNotEnoughSignatures
}

func (r *messageRelayer) getCurrentCanonicalValidatorSet() ([]*warp.Validator, uint64, error) {
var (
signingSubnet ids.ID
err error
)
// GetCurrentValidators currently does not return BLS public key and proof of possession for subnets.
// That information is only included for primary network validators, so we first look up the current primary
// network validators and index them on their node ID. Then we lookup the subnet validators, and match
// their BLS public key and proof of possession with that of the primary network validator with the same node ID.
// TODO: Check with platform team if the "signer" information could be returned for the specific subnets' validators.
primaryNetworkValidators, err := r.relayer.pChainClient.GetCurrentValidators(context.Background(), ids.Empty, nil)
if err != nil {
r.logger.Error(
"Failed to get current primary network validator set",
zap.Error(err),
)
return nil, 0, err
}
primaryNetworkSigners := make(map[ids.NodeID]*signer.ProofOfPossession)
for _, validator := range primaryNetworkValidators {
r.logger.Debug(
"Set primary network BLS signer for node",
zap.String("nodeID", validator.NodeID.String()),
)
primaryNetworkSigners[validator.NodeID] = validator.Signer
}

var signingSubnet ids.ID
if r.relayer.sourceSubnetID == constants.PrimaryNetworkID {
// If the message originates from the primary subnet, then we instead "self sign" the message using the validators of the destination subnet.
signingSubnet, err = r.relayer.pChainClient.ValidatedBy(context.Background(), r.destinationChainID)
signingSubnet, err = r.relayer.pChainClient.ValidatedBy(context.Background(), r.warpMessage.DestinationChainID)
if err != nil {
r.logger.Error(
"failed to get validating subnet for destination chain",
zap.String("destinationChainID", r.destinationChainID.String()),
zap.String("destinationChainID", r.warpMessage.DestinationChainID.String()),
zap.Error(err),
)
return nil, 0, err
Expand All @@ -392,23 +410,33 @@ func (r *messageRelayer) getCurrentCanonicalValidatorSet() ([]*warp.Validator, u
// Otherwise, the source subnet signs the message.
signingSubnet = r.relayer.sourceSubnetID
}

height, err := r.relayer.pChainClient.GetHeight(context.Background())
subnetValidators, err := r.relayer.pChainClient.GetCurrentValidators(context.Background(), signingSubnet, nil)
if err != nil {
r.logger.Error(
"Failed to get P-Chain height",
"Failed to get the current validator set",
zap.String("subnetID", r.relayer.sourceSubnetID.String()),
zap.Error(err),
)
return nil, 0, err
}

// Get the current canonical validator set of the source subnet.
canonicalSubnetValidators, totalValidatorWeight, err := warp.GetCanonicalValidatorSet(
context.Background(),
r.relayer.canonicalValidatorClient,
height,
signingSubnet,
)
for i := 0; i < len(subnetValidators); i++ {
if primaryNetworkSigner, ok := primaryNetworkSigners[subnetValidators[i].NodeID]; ok {
subnetValidators[i].Signer = primaryNetworkSigner
r.logger.Debug(
"Set subnet BLS signer for node",
zap.String("nodeID", subnetValidators[i].NodeID.String()),
)
} else {
r.logger.Debug(
"Missing BLS signer info for node",
zap.String("nodeID", subnetValidators[i].NodeID.String()),
)
}
}
// Get the canonical validator set. The returned slice consists of virtual validators, each represented by a single BLS pubkey and composed of multiple individual nodes
// This is because BLS keys are not unique, and may be shared by multiple nodeIDs, which are the entities we need to query
canonicalSubnetValidators, totalValidatorWeight, err := temp.GetCanonicalValidatorSet(subnetValidators)
if err != nil {
r.logger.Error(
"Failed to get the canonical subnet validator set",
Expand Down Expand Up @@ -460,7 +488,7 @@ func (r *messageRelayer) isValidSignatureResponse(
r.logger.Debug(
"Response contained an empty signature",
zap.String("nodeID", response.NodeID().String()),
zap.String("destinationChainID", r.destinationChainID.String()),
zap.String("destinationChainID", r.warpMessage.DestinationChainID.String()),
)
return blsSignatureBuf{}, false
}
Expand All @@ -469,15 +497,15 @@ func (r *messageRelayer) isValidSignatureResponse(
if err != nil {
r.logger.Debug(
"Failed to create signature from response",
zap.String("destinationChainID", r.destinationChainID.String()),
zap.String("destinationChainID", r.warpMessage.DestinationChainID.String()),
)
return blsSignatureBuf{}, false
}

if !bls.Verify(pubKey, sig, r.warpMessage.Bytes()) {
r.logger.Debug(
"Failed verification for signature",
zap.String("destinationChainID", r.destinationChainID.String()),
zap.String("destinationChainID", r.warpMessage.DestinationChainID.String()),
)
return blsSignatureBuf{}, false
}
Expand Down Expand Up @@ -519,15 +547,15 @@ func (r *messageRelayer) aggregateSignatures(signatureMap map[int]blsSignatureBu
func (r *messageRelayer) incSuccessfulRelayMessageCount() {
r.metrics.successfulRelayMessageCount.
WithLabelValues(
r.destinationChainID.String(),
r.warpMessage.DestinationChainID.String(),
r.relayer.sourceChainID.String(),
r.relayer.sourceSubnetID.String()).Inc()
}

func (r *messageRelayer) incFailedRelayMessageCount(failureReason string) {
r.metrics.failedRelayMessageCount.
WithLabelValues(
r.destinationChainID.String(),
r.warpMessage.DestinationChainID.String(),
r.relayer.sourceChainID.String(),
r.relayer.sourceSubnetID.String(),
failureReason).Inc()
Expand All @@ -536,7 +564,7 @@ func (r *messageRelayer) incFailedRelayMessageCount(failureReason string) {
func (r *messageRelayer) setCreateSignedMessageLatencyMS(latency float64) {
r.metrics.createSignedMessageLatencyMS.
WithLabelValues(
r.destinationChainID.String(),
r.warpMessage.DestinationChainID.String(),
r.relayer.sourceChainID.String(),
r.relayer.sourceSubnetID.String()).Set(latency)
}
Loading

0 comments on commit a83a29e

Please # to comment.