diff --git a/messages/message_manager.go b/messages/message_manager.go index 3a7601e6..46d608ab 100644 --- a/messages/message_manager.go +++ b/messages/message_manager.go @@ -20,6 +20,7 @@ import ( // for each message protocol, and performs the sending to the destination chain. type MessageManager interface { // ShouldSendMessage returns true if the message should be sent to the destination chain + // If an error is returned, the boolean should be ignored by the caller. ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) (bool, error) // SendMessage sends the signed message to the destination chain. The payload parsed according to // the VM rules is also passed in, since MessageManager does not assume any particular VM diff --git a/messages/teleporter/message.go b/messages/teleporter/message.go index 0d467bef..7729b394 100644 --- a/messages/teleporter/message.go +++ b/messages/teleporter/message.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" + "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/subnet-evm/accounts/abi" "github.com/ethereum/go-ethereum/common" ) @@ -28,12 +29,19 @@ type TeleporterMessageReceipt struct { RelayerRewardAddress common.Address `json:"relayerRewardAddress"` } -// ReceiveCrossChainMessageInput is the input to the ReceiveCrossChainMessage -// in the contract deployed on the receiving chain +// ReceiveCrossChainMessageInput is the input to receiveCrossChainMessage call +// in the contract deployed on the destination chain type ReceiveCrossChainMessageInput struct { RelayerRewardAddress common.Address `json:"relayerRewardAddress"` } +// MessageReceivedInput is the input to messageReceived call +// in the contract deployed on the destination chain +type MessageReceivedInput struct { + OriginChainID ids.ID `json:"relayerRewardAddress"` + MessageID *big.Int `json:"messageID"` +} + // unpack Teleporter message bytes according to EVM ABI encoding rules func unpackTeleporterMessage(messageBytes []byte) (*TeleporterMessage, error) { args := abi.Arguments{ @@ -60,3 +68,13 @@ func unpackTeleporterMessage(messageBytes []byte) (*TeleporterMessage, error) { func packReceiverMessage(inputStruct ReceiveCrossChainMessageInput) ([]byte, error) { return EVMTeleporterContractABI.Pack("receiveCrossChainMessage", inputStruct.RelayerRewardAddress) } + +func packMessageReceivedMessage(inputStruct MessageReceivedInput) ([]byte, error) { + return EVMTeleporterContractABI.Pack("messageReceived", inputStruct.OriginChainID, inputStruct.MessageID) +} + +func unpackMessageReceivedResult(result []byte) (bool, error) { + var success bool + err := EVMTeleporterContractABI.UnpackIntoInterface(&success, "messageReceived", result) + return success, err +} diff --git a/messages/teleporter/message_manager.go b/messages/teleporter/message_manager.go index e7cca282..aa6c152e 100644 --- a/messages/teleporter/message_manager.go +++ b/messages/teleporter/message_manager.go @@ -4,7 +4,9 @@ package teleporter import ( + "context" "encoding/json" + "errors" "fmt" "github.com/ava-labs/avalanchego/cache" @@ -14,6 +16,8 @@ import ( "github.com/ava-labs/awm-relayer/config" "github.com/ava-labs/awm-relayer/vms" "github.com/ava-labs/awm-relayer/vms/vmtypes" + "github.com/ava-labs/subnet-evm/ethclient" + "github.com/ava-labs/subnet-evm/interfaces" "github.com/ethereum/go-ethereum/common" "go.uber.org/zap" ) @@ -70,6 +74,20 @@ func NewMessageManager( }, nil } +func isAllowedRelayer(allowedRelayers []common.Address, eoa common.Address) bool { + // If no allowed relayer addresses were set, then anyone can relay it. + if len(allowedRelayers) == 0 { + return true + } + + for _, addr := range allowedRelayers { + if addr == eoa { + return true + } + } + return false +} + // ShouldSendMessage returns true if the message should be sent to the destination chain func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) (bool, error) { // Unpack the teleporter message and add it to the cache @@ -87,29 +105,113 @@ func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageI if !ok { return false, fmt.Errorf("relayer not configured to deliver to destination. destinationChainID=%s", destinationChainID.String()) } - if !destinationClient.Allowed(destinationChainID, teleporterMessage.AllowedRelayerAddresses) { + senderAddress := destinationClient.SenderAddress() + if !isAllowedRelayer(teleporterMessage.AllowedRelayerAddresses, senderAddress) { m.logger.Info( - "Relayer not allowed to deliver to chain.", + "Relayer EOA not allowed to deliver this message.", zap.String("destinationChainID", destinationChainID.String()), + zap.String("warpMessageID", warpMessageInfo.WarpUnsignedMessage.ID().String()), + zap.String("teleporterMessageID", teleporterMessage.MessageID.String()), ) return false, nil } + + delivered, err := m.messageDelivered( + destinationClient, + warpMessageInfo, + teleporterMessage, + destinationChainID, + ) + if err != nil { + m.logger.Error( + "Failed to check if message has been delivered to destination chain.", + zap.String("destinationChainID", destinationChainID.String()), + zap.String("warpMessageID", warpMessageInfo.WarpUnsignedMessage.ID().String()), + zap.String("teleporterMessageID", teleporterMessage.MessageID.String()), + zap.Error(err), + ) + return false, err + } + if delivered { + m.logger.Info( + "Message already delivered to destination.", + zap.String("destinationChainID", destinationChainID.String()), + zap.String("teleporterMessageID", teleporterMessage.MessageID.String()), + ) + return false, nil + } + // Cache the message so it can be reused in SendMessage m.teleporterMessageCache.Put(warpMessageInfo.WarpUnsignedMessage.ID(), teleporterMessage) return true, nil } +// Helper to check if a message has been delivered to the destination chain +// Returns true if the message has been delivered, false if not +// On error, the boolean result should be ignored +func (m *messageManager) messageDelivered( + destinationClient vms.DestinationClient, + warpMessageInfo *vmtypes.WarpMessageInfo, + teleporterMessage *TeleporterMessage, + destinationChainID ids.ID) (bool, error) { + // Check if the message has already been delivered to the destination chain + client, ok := destinationClient.Client().(ethclient.Client) + if !ok { + m.logger.Error( + "Destination client is not an Ethereum client.", + zap.String("destinationChainID", destinationChainID.String()), + ) + return false, errors.New("destination client is not an Ethereum client") + } + + data, err := packMessageReceivedMessage(MessageReceivedInput{ + OriginChainID: warpMessageInfo.WarpUnsignedMessage.SourceChainID, + MessageID: teleporterMessage.MessageID, + }) + if err != nil { + m.logger.Error( + "Failed packing messageReceived call data.", + zap.String("destinationChainID", destinationChainID.String()), + zap.Error(err), + ) + return false, err + } + protocolAddress := common.BytesToAddress(m.protocolAddress[:]) + callMessage := interfaces.CallMsg{ + To: &protocolAddress, + Data: data, + } + result, err := client.CallContract(context.Background(), callMessage, nil) + if err != nil { + m.logger.Error( + "Failed calling messageReceived method on destination chain.", + zap.String("destinationChainID", destinationChainID.String()), + zap.Error(err), + ) + return false, err + } + // check the contract call result + delivered, err := unpackMessageReceivedResult(result) + if err != nil { + m.logger.Error( + "Failed unpacking messageReceived result.", + zap.String("destinationChainID", destinationChainID.String()), + zap.Error(err), + ) + return false, err + } + + return delivered, nil +} + // SendMessage extracts the gasLimit and packs the call data to call the receiveCrossChainMessage method of the Teleporter contract, // and dispatches transaction construction and broadcast to the destination client func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayload []byte, destinationChainID ids.ID) error { - var ( - teleporterMessage *TeleporterMessage - ok bool - ) - teleporterMessage, ok = m.teleporterMessageCache.Get(signedMessage.ID()) + teleporterMessage, ok := m.teleporterMessageCache.Get(signedMessage.ID()) if !ok { m.logger.Debug( "Teleporter message to send not in cache. Extracting from signed warp message.", + zap.String("destinationChainID", destinationChainID.String()), zap.String("warpMessageID", signedMessage.ID().String()), ) var err error @@ -117,6 +219,7 @@ func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayloa if err != nil { m.logger.Error( "Failed unpacking teleporter message.", + zap.String("destinationChainID", destinationChainID.String()), zap.String("warpMessageID", signedMessage.ID().String()), ) return err @@ -126,13 +229,16 @@ func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayloa m.logger.Info( "Sending message to destination chain", zap.String("destinationChainID", destinationChainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), zap.String("teleporterMessageID", teleporterMessage.MessageID.String()), ) numSigners, err := signedMessage.Signature.NumSigners() if err != nil { m.logger.Error( "Failed to get number of signers", + zap.String("destinationChainID", destinationChainID.String()), zap.String("warpMessageID", signedMessage.ID().String()), + zap.String("teleporterMessageID", teleporterMessage.MessageID.String()), ) return err } @@ -140,7 +246,9 @@ func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayloa if err != nil { m.logger.Error( "Gas limit required overflowed uint64 max. not relaying message", + zap.String("destinationChainID", destinationChainID.String()), zap.String("warpMessageID", signedMessage.ID().String()), + zap.String("teleporterMessageID", teleporterMessage.MessageID.String()), ) return err } @@ -151,7 +259,9 @@ func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayloa if err != nil { m.logger.Error( "Failed packing receiveCrossChainMessage call data", + zap.String("destinationChainID", destinationChainID.String()), zap.String("warpMessageID", signedMessage.ID().String()), + zap.String("teleporterMessageID", teleporterMessage.MessageID.String()), ) return err } @@ -165,8 +275,9 @@ func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayloa if err != nil { m.logger.Error( "Failed to send tx.", - zap.String("warpMessageID", signedMessage.ID().String()), zap.String("destinationChainID", destinationChainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + zap.String("teleporterMessageID", teleporterMessage.MessageID.String()), zap.Error(err), ) return err @@ -174,6 +285,7 @@ func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayloa m.logger.Info( "Sent message to destination chain", zap.String("destinationChainID", destinationChainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), zap.String("teleporterMessageID", teleporterMessage.MessageID.String()), ) return nil diff --git a/vms/destination_client.go b/vms/destination_client.go index cec3b06b..44752d0f 100644 --- a/vms/destination_client.go +++ b/vms/destination_client.go @@ -22,8 +22,14 @@ type DestinationClient interface { // TODO: Make generic for any VM. SendTx(signedMessage *warp.Message, toAddress string, gasLimit uint64, callData []byte) error - // Allowed checks if the relayer is allowed to relay the message according to the VM rules and the message metadata - Allowed(chainID ids.ID, allowedRelayers []common.Address) bool + // Client returns the underlying client for the destination chain + Client() interface{} + + // SenderAddress returns the address of the relayer on the destination chain + SenderAddress() common.Address + + // DestinationChainID returns the ID of the destination chain + DestinationChainID() ids.ID } func NewDestinationClient(logger logging.Logger, subnetInfo config.DestinationSubnet) (DestinationClient, error) { diff --git a/vms/evm/destination_client.go b/vms/evm/destination_client.go index 57ade4ce..b904e54d 100644 --- a/vms/evm/destination_client.go +++ b/vms/evm/destination_client.go @@ -95,18 +95,18 @@ func NewDestinationClient(logger logging.Logger, subnetInfo config.DestinationSu }, nil } -func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message, +func (c *destinationClient) SendTx(signedMessage *avalancheWarp.Message, toAddress string, gasLimit uint64, callData []byte) error { // Synchronize teleporter message requests to the same destination chain so that message ordering is preserved - tdc.lock.Lock() - defer tdc.lock.Unlock() + c.lock.Lock() + defer c.lock.Unlock() // We need the global 32-byte representation of the destination chain ID, as well as the destination's configured chainID // Without the destination's configured chainID, transaction signature verification will fail - destinationChainIDBigInt, err := tdc.client.ChainID(context.Background()) + destinationChainIDBigInt, err := c.client.ChainID(context.Background()) if err != nil { - tdc.logger.Error( + c.logger.Error( "Failed to get chain ID from destination chain endpoint", zap.Error(err), ) @@ -114,9 +114,9 @@ func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message, } // Get the current base fee estimation, which is based on the previous blocks gas usage. - baseFee, err := tdc.client.EstimateBaseFee(context.Background()) + baseFee, err := c.client.EstimateBaseFee(context.Background()) if err != nil { - tdc.logger.Error( + c.logger.Error( "Failed to get base fee", zap.Error(err), ) @@ -125,9 +125,9 @@ func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message, // Get the suggested gas tip cap of the network // TODO: Add a configurable ceiling to this value - gasTipCap, err := tdc.client.SuggestGasTipCap(context.Background()) + gasTipCap, err := c.client.SuggestGasTipCap(context.Background()) if err != nil { - tdc.logger.Error( + c.logger.Error( "Failed to get gas tip cap", zap.Error(err), ) @@ -146,7 +146,7 @@ func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message, // Construct the actual transaction to broadcast on the destination chain tx := types.NewTx(&types.DynamicFeeTx{ ChainID: destinationChainIDBigInt, - Nonce: tdc.currentNonce, + Nonce: c.currentNonce, To: &to, Gas: gasLimit, GasFeeCap: gasFeeCap, @@ -163,17 +163,17 @@ func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message, // Sign and send the transaction on the destination chain signer := types.LatestSignerForChainID(destinationChainIDBigInt) - signedTx, err := types.SignTx(tx, signer, tdc.pk) + signedTx, err := types.SignTx(tx, signer, c.pk) if err != nil { - tdc.logger.Error( + c.logger.Error( "Failed to sign transaction", zap.Error(err), ) return err } - if err := tdc.client.SendTransaction(context.Background(), signedTx); err != nil { - tdc.logger.Error( + if err := c.client.SendTransaction(context.Background(), signedTx); err != nil { + c.logger.Error( "Failed to send transaction", zap.Error(err), ) @@ -182,8 +182,8 @@ func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message, // Increment the nonce to use on the destination chain now that we've sent // a transaction using the current value. - tdc.currentNonce++ - tdc.logger.Info( + c.currentNonce++ + c.logger.Info( "Sent transaction", zap.String("txID", signedTx.Hash().String()), ) @@ -191,34 +191,14 @@ func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message, return nil } -func (tdc *destinationClient) isDestination(chainID ids.ID) bool { - if chainID != tdc.destinationChainID { - tdc.logger.Info( - "Destination chain ID for message not supported by relayer.", - zap.String("chainID", chainID.String()), - ) - return false - } - return true +func (c *destinationClient) Client() interface{} { + return c.client } -func (tdc *destinationClient) isAllowedRelayer(allowedRelayers []common.Address) bool { - // If no allowed relayer addresses were set, then anyone can relay it. - if len(allowedRelayers) == 0 { - return true - } - - for _, addr := range allowedRelayers { - if addr == tdc.eoa { - return true - } - } - - tdc.logger.Info("Relayer EOA not allowed to deliver this message.") - return false +func (c *destinationClient) SenderAddress() common.Address { + return c.eoa } -func (tdc *destinationClient) Allowed(chainID ids.ID, allowedRelayers []common.Address) bool { - return tdc.isDestination(chainID) && - tdc.isAllowedRelayer(allowedRelayers) +func (c *destinationClient) DestinationChainID() ids.ID { + return c.destinationChainID }