Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Check if message already delivered #23

Merged
merged 13 commits into from
Sep 15, 2023
1 change: 1 addition & 0 deletions messages/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
// for each message protocol, and performs the sending to the destination chain.
type MessageManager interface {
// ShouldSendMessage returns true if the message should be sent to the destination chain
// If an error is returned, the boolean should be ignored by the caller.
ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) (bool, error)
// SendMessage sends the signed message to the destination chain. The payload parsed according to
// the VM rules is also passed in, since MessageManager does not assume any particular VM
Expand Down
22 changes: 20 additions & 2 deletions messages/teleporter/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/big"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/subnet-evm/accounts/abi"
"github.com/ethereum/go-ethereum/common"
)
Expand All @@ -28,12 +29,19 @@ type TeleporterMessageReceipt struct {
RelayerRewardAddress common.Address `json:"relayerRewardAddress"`
}

// ReceiveCrossChainMessageInput is the input to the ReceiveCrossChainMessage
// in the contract deployed on the receiving chain
// ReceiveCrossChainMessageInput is the input to receiveCrossChainMessage call
// in the contract deployed on the destination chain
type ReceiveCrossChainMessageInput struct {
RelayerRewardAddress common.Address `json:"relayerRewardAddress"`
}

// MessageReceivedInput is the input to messageReceived call
// in the contract deployed on the destination chain
type MessageReceivedInput struct {
OriginChainID ids.ID `json:"relayerRewardAddress"`
MessageID *big.Int `json:"messageID"`
}

// unpack Teleporter message bytes according to EVM ABI encoding rules
func unpackTeleporterMessage(messageBytes []byte) (*TeleporterMessage, error) {
args := abi.Arguments{
Expand All @@ -60,3 +68,13 @@ func unpackTeleporterMessage(messageBytes []byte) (*TeleporterMessage, error) {
func packReceiverMessage(inputStruct ReceiveCrossChainMessageInput) ([]byte, error) {
return EVMTeleporterContractABI.Pack("receiveCrossChainMessage", inputStruct.RelayerRewardAddress)
}

func packMessageReceivedMessage(inputStruct MessageReceivedInput) ([]byte, error) {
return EVMTeleporterContractABI.Pack("messageReceived", inputStruct.OriginChainID, inputStruct.MessageID)
}

func unpackMessageReceivedResult(result []byte) (bool, error) {
var success bool
err := EVMTeleporterContractABI.UnpackIntoInterface(&success, "messageReceived", result)
return success, err
}
128 changes: 120 additions & 8 deletions messages/teleporter/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package teleporter

import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/ava-labs/avalanchego/cache"
Expand All @@ -14,6 +16,8 @@ import (
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/awm-relayer/vms/vmtypes"
"github.com/ava-labs/subnet-evm/ethclient"
"github.com/ava-labs/subnet-evm/interfaces"
"github.com/ethereum/go-ethereum/common"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -70,6 +74,20 @@ func NewMessageManager(
}, nil
}

func isAllowedRelayer(allowedRelayers []common.Address, eoa common.Address) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should relayers be destination specific?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the code level, this particular check is ensuring that the relayer EOA that will send the tx to the destination is included in the Teleporter message's list of allowed relayers. The function above checks that the EVM client that will send the tx is sending to the destination chain ID included in the Teleporter message.

At the architectural level, DestinationClient is the entity responsible for sending the tx to the correct destination chain. MessageRelayers should be composed of exactly one DestinationClient, so in that sense, relayers are destination specific.

// If no allowed relayer addresses were set, then anyone can relay it.
if len(allowedRelayers) == 0 {
return true
}

for _, addr := range allowedRelayers {
if addr == eoa {
return true
}
}
return false
}

// ShouldSendMessage returns true if the message should be sent to the destination chain
func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) (bool, error) {
// Unpack the teleporter message and add it to the cache
Expand All @@ -87,36 +105,121 @@ func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageI
if !ok {
return false, fmt.Errorf("relayer not configured to deliver to destination. destinationChainID=%s", destinationChainID.String())
}
if !destinationClient.Allowed(destinationChainID, teleporterMessage.AllowedRelayerAddresses) {
senderAddress := destinationClient.SenderAddress()
if !isAllowedRelayer(teleporterMessage.AllowedRelayerAddresses, senderAddress) {
m.logger.Info(
"Relayer not allowed to deliver to chain.",
"Relayer EOA not allowed to deliver this message.",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", warpMessageInfo.WarpUnsignedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessage.MessageID.String()),
)
return false, nil
}

delivered, err := m.messageDelivered(
destinationClient,
warpMessageInfo,
teleporterMessage,
destinationChainID,
)
if err != nil {
m.logger.Error(
"Failed to check if message has been delivered to destination chain.",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", warpMessageInfo.WarpUnsignedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessage.MessageID.String()),
zap.Error(err),
)
return false, err
}
if delivered {
m.logger.Info(
"Message already delivered to destination.",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("teleporterMessageID", teleporterMessage.MessageID.String()),
)
return false, nil
}

// Cache the message so it can be reused in SendMessage
m.teleporterMessageCache.Put(warpMessageInfo.WarpUnsignedMessage.ID(), teleporterMessage)
return true, nil
}

// Helper to check if a message has been delivered to the destination chain
// Returns true if the message has been delivered, false if not
// On error, the boolean result should be ignored
func (m *messageManager) messageDelivered(
destinationClient vms.DestinationClient,
warpMessageInfo *vmtypes.WarpMessageInfo,
teleporterMessage *TeleporterMessage,
destinationChainID ids.ID) (bool, error) {
// Check if the message has already been delivered to the destination chain
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
client, ok := destinationClient.Client().(ethclient.Client)
if !ok {
m.logger.Error(
"Destination client is not an Ethereum client.",
zap.String("destinationChainID", destinationChainID.String()),
)
return false, errors.New("destination client is not an Ethereum client")
}

data, err := packMessageReceivedMessage(MessageReceivedInput{
OriginChainID: warpMessageInfo.WarpUnsignedMessage.SourceChainID,
MessageID: teleporterMessage.MessageID,
})
if err != nil {
m.logger.Error(
"Failed packing messageReceived call data.",
zap.String("destinationChainID", destinationChainID.String()),
zap.Error(err),
)
return false, err
}
protocolAddress := common.BytesToAddress(m.protocolAddress[:])
callMessage := interfaces.CallMsg{
gwen917 marked this conversation as resolved.
Show resolved Hide resolved
To: &protocolAddress,
Data: data,
}
result, err := client.CallContract(context.Background(), callMessage, nil)
if err != nil {
m.logger.Error(
"Failed calling messageReceived method on destination chain.",
zap.String("destinationChainID", destinationChainID.String()),
zap.Error(err),
)
return false, err
}
// check the contract call result
delivered, err := unpackMessageReceivedResult(result)
if err != nil {
m.logger.Error(
"Failed unpacking messageReceived result.",
zap.String("destinationChainID", destinationChainID.String()),
zap.Error(err),
)
return false, err
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
}

return delivered, nil
}

// SendMessage extracts the gasLimit and packs the call data to call the receiveCrossChainMessage method of the Teleporter contract,
// and dispatches transaction construction and broadcast to the destination client
func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayload []byte, destinationChainID ids.ID) error {
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
var (
teleporterMessage *TeleporterMessage
ok bool
)
teleporterMessage, ok = m.teleporterMessageCache.Get(signedMessage.ID())
teleporterMessage, ok := m.teleporterMessageCache.Get(signedMessage.ID())
if !ok {
m.logger.Debug(
"Teleporter message to send not in cache. Extracting from signed warp message.",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", signedMessage.ID().String()),
)
var err error
teleporterMessage, err = unpackTeleporterMessage(parsedVmPayload)
if err != nil {
m.logger.Error(
"Failed unpacking teleporter message.",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", signedMessage.ID().String()),
)
return err
Expand All @@ -126,21 +229,26 @@ func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayloa
m.logger.Info(
"Sending message to destination chain",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", signedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessage.MessageID.String()),
)
numSigners, err := signedMessage.Signature.NumSigners()
if err != nil {
m.logger.Error(
"Failed to get number of signers",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", signedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessage.MessageID.String()),
)
return err
}
gasLimit, err := CalculateReceiveMessageGasLimit(numSigners, teleporterMessage.RequiredGasLimit)
if err != nil {
m.logger.Error(
"Gas limit required overflowed uint64 max. not relaying message",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", signedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessage.MessageID.String()),
)
return err
}
Expand All @@ -151,7 +259,9 @@ func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayloa
if err != nil {
m.logger.Error(
"Failed packing receiveCrossChainMessage call data",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", signedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessage.MessageID.String()),
)
return err
}
Expand All @@ -165,15 +275,17 @@ func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayloa
if err != nil {
m.logger.Error(
"Failed to send tx.",
zap.String("warpMessageID", signedMessage.ID().String()),
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", signedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessage.MessageID.String()),
zap.Error(err),
)
return err
}
m.logger.Info(
"Sent message to destination chain",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", signedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessage.MessageID.String()),
)
return nil
Expand Down
10 changes: 8 additions & 2 deletions vms/destination_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ type DestinationClient interface {
// TODO: Make generic for any VM.
SendTx(signedMessage *warp.Message, toAddress string, gasLimit uint64, callData []byte) error

// Allowed checks if the relayer is allowed to relay the message according to the VM rules and the message metadata
Allowed(chainID ids.ID, allowedRelayers []common.Address) bool
// Client returns the underlying client for the destination chain
Client() interface{}

// SenderAddress returns the address of the relayer on the destination chain
SenderAddress() common.Address

// DestinationChainID returns the ID of the destination chain
DestinationChainID() ids.ID
}

func NewDestinationClient(logger logging.Logger, subnetInfo config.DestinationSubnet) (DestinationClient, error) {
Expand Down
Loading