Skip to content

Commit

Permalink
Migrating Push Gossip to SDK handlers (#363)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-kim authored Dec 19, 2023
1 parent 862a565 commit dd2d956
Show file tree
Hide file tree
Showing 12 changed files with 531 additions and 105 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/VictoriaMetrics/fastcache v1.10.0
github.com/ava-labs/avalanchego v1.10.18-rc.2
github.com/ava-labs/avalanchego v1.10.18-rc.3
github.com/cespare/cp v0.1.0
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
github.com/davecgh/go-spew v1.1.1
Expand Down Expand Up @@ -39,7 +39,7 @@ require (
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
go.uber.org/goleak v1.2.1
go.uber.org/mock v0.2.0
golang.org/x/crypto v0.16.0
golang.org/x/crypto v0.17.0
golang.org/x/exp v0.0.0-20231127185646-65229373498e
golang.org/x/sync v0.5.0
golang.org/x/sys v0.15.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/avalanchego v1.10.18-rc.2 h1:fjks/pUp7HmGzjupl+f3MfGo5cLkCrDnoCarL8NyqlA=
github.com/ava-labs/avalanchego v1.10.18-rc.2/go.mod h1:D0tP5nGZtGb/vNOzvROUGUER+Gcar73l9qP97vbEarY=
github.com/ava-labs/avalanchego v1.10.18-rc.3 h1:LmTUUu0jXVAyRPs8xH7bD2I2vuW3TyvfLesKCkDBq1c=
github.com/ava-labs/avalanchego v1.10.18-rc.3/go.mod h1:A5cyvLdc2OEOkDTqP7bRRWu/npQN4gcD3e8K2nugp4c=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down Expand Up @@ -648,8 +648,8 @@ golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down
12 changes: 6 additions & 6 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,14 +446,14 @@ func (n *network) Gossip(gossip []byte) error {
return n.appSender.SendAppGossip(context.TODO(), gossip)
}

// AppGossip is called by avalanchego -> VM when there is an incoming AppGossip from a peer
// error returned by this function is expected to be treated as fatal by the engine
// returns error if request could not be parsed as message.Request or when the requestHandler returns an error
func (n *network) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) error {
// AppGossip is called by avalanchego -> VM when there is an incoming AppGossip
// from a peer. An error returned by this function is treated as fatal by the
// engine.
func (n *network) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) error {
var gossipMsg message.GossipMessage
if _, err := n.codec.Unmarshal(gossipBytes, &gossipMsg); err != nil {
log.Debug("could not parse app gossip", "nodeID", nodeID, "gossipLen", len(gossipBytes), "err", err)
return nil
log.Debug("forwarding AppGossip to SDK network", "nodeID", nodeID, "gossipLen", len(gossipBytes), "err", err)
return n.p2pNetwork.AppGossip(ctx, nodeID, gossipBytes)
}

log.Debug("processing AppGossip from node", "nodeID", nodeID, "msg", gossipMsg)
Expand Down
56 changes: 56 additions & 0 deletions plugin/evm/gossip_mempool.go → plugin/evm/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ethereum/go-ethereum/log"

"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/network/p2p/gossip"

"github.com/ava-labs/coreth/core"
Expand All @@ -19,11 +22,64 @@ import (
)

var (
_ p2p.Handler = (*txGossipHandler)(nil)

_ gossip.Gossipable = (*GossipEthTx)(nil)
_ gossip.Gossipable = (*GossipAtomicTx)(nil)
_ gossip.Set[*GossipEthTx] = (*GossipEthTxPool)(nil)
)

func newTxGossipHandler[T any, U gossip.GossipableAny[T]](
log logging.Logger,
mempool gossip.Set[U],
metrics gossip.Metrics,
maxMessageSize int,
throttlingPeriod time.Duration,
throttlingLimit int,
validators *p2p.Validators,
) txGossipHandler {
// push gossip messages can be handled from any peer
handler := gossip.NewHandler[T, U](
log,
// Don't forward gossip to avoid double-forwarding
gossip.NoOpAccumulator[U]{},
mempool,
metrics,
maxMessageSize,
)

// pull gossip requests are filtered by validators and are throttled
// to prevent spamming
validatorHandler := p2p.NewValidatorHandler(
p2p.NewThrottlerHandler(
handler,
p2p.NewSlidingWindowThrottler(throttlingPeriod, throttlingLimit),
log,
),
validators,
log,
)

return txGossipHandler{
appGossipHandler: handler,
appRequestHandler: validatorHandler,
}
}

type txGossipHandler struct {
p2p.NoOpHandler
appGossipHandler p2p.Handler
appRequestHandler p2p.Handler
}

func (t txGossipHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) {
t.appGossipHandler.AppGossip(ctx, nodeID, gossipBytes)
}

func (t txGossipHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) {
return t.appRequestHandler.AppRequest(ctx, nodeID, deadline, requestBytes)
}

type GossipAtomicTx struct {
Tx *Tx
}
Expand Down
File renamed without changes.
47 changes: 42 additions & 5 deletions plugin/evm/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ package evm

import (
"container/heap"
"context"
"math/big"
"sync"
"time"

"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/network/p2p/gossip"

"github.com/ava-labs/coreth/peer"

Expand Down Expand Up @@ -56,10 +58,12 @@ type pushGossiper struct {
ctx *snow.Context
config Config

client peer.NetworkClient
blockchain *core.BlockChain
txPool *txpool.TxPool
atomicMempool *Mempool
client peer.NetworkClient
blockchain *core.BlockChain
txPool *txpool.TxPool
atomicMempool *Mempool
ethTxGossiper gossip.Accumulator[*GossipEthTx]
atomicTxGossiper gossip.Accumulator[*GossipAtomicTx]

// We attempt to batch transactions we need to gossip to avoid runaway
// amplification of mempol chatter.
Expand All @@ -80,7 +84,11 @@ type pushGossiper struct {

// createGossiper constructs and returns a pushGossiper or noopGossiper
// based on whether vm.chainConfig.ApricotPhase4BlockTimestamp is set
func (vm *VM) createGossiper(stats GossipStats) Gossiper {
func (vm *VM) createGossiper(
stats GossipStats,
ethTxGossiper gossip.Accumulator[*GossipEthTx],
atomicTxGossiper gossip.Accumulator[*GossipAtomicTx],
) Gossiper {
net := &pushGossiper{
ctx: vm.ctx,
config: vm.config,
Expand All @@ -96,7 +104,10 @@ func (vm *VM) createGossiper(stats GossipStats) Gossiper {
recentEthTxs: &cache.LRU[common.Hash, interface{}]{Size: recentCacheSize},
codec: vm.networkCodec,
stats: stats,
ethTxGossiper: ethTxGossiper,
atomicTxGossiper: atomicTxGossiper,
}

net.awaitEthTxGossip()
return net
}
Expand Down Expand Up @@ -238,6 +249,12 @@ func (n *pushGossiper) awaitEthTxGossip() {
"err", err,
)
}
if err := n.ethTxGossiper.Gossip(context.TODO()); err != nil {
log.Warn(
"failed to send eth transactions",
"err", err,
)
}
case <-regossipTicker.C:
for _, tx := range n.queueRegossipTxs() {
n.ethTxsToGossip[tx.Hash()] = tx
Expand All @@ -260,6 +277,21 @@ func (n *pushGossiper) awaitEthTxGossip() {
"err", err,
)
}

gossipTxs := make([]*GossipEthTx, 0, len(txs))
for _, tx := range txs {
gossipTxs = append(gossipTxs, &GossipEthTx{Tx: tx})
}

n.ethTxGossiper.Add(gossipTxs...)
if err := n.ethTxGossiper.Gossip(context.TODO()); err != nil {
log.Warn(
"failed to send eth transactions",
"len(txs)", len(txs),
"err", err,
)
}

case <-n.shutdownChan:
return
}
Expand Down Expand Up @@ -301,6 +333,11 @@ func (n *pushGossiper) gossipAtomicTx(tx *Tx) error {
"txID", txID,
)
n.stats.IncAtomicGossipSent()
n.atomicTxGossiper.Add(&GossipAtomicTx{Tx: tx})
if err := n.atomicTxGossiper.Gossip(context.TODO()); err != nil {
return err
}

return n.client.Gossip(msgBytes)
}

Expand Down
54 changes: 45 additions & 9 deletions plugin/evm/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ const (
discardedTxsCacheSize = 50
)

var errNoGasUsed = errors.New("no gas used")
var (
errTxAlreadyKnown = errors.New("tx already known")
errNoGasUsed = errors.New("no gas used")

_ gossip.Set[*GossipAtomicTx] = (*Mempool)(nil)
)

// mempoolMetrics defines the metrics for the atomic mempool
type mempoolMetrics struct {
Expand Down Expand Up @@ -142,16 +147,37 @@ func (m *Mempool) Add(tx *GossipAtomicTx) error {
m.ctx.Lock.RLock()
defer m.ctx.Lock.RUnlock()

return m.AddTx(tx.Tx)
m.lock.Lock()
defer m.lock.Unlock()

err := m.addTx(tx.Tx, false)
if errors.Is(err, errTxAlreadyKnown) {
return err
}

if err != nil {
txID := tx.Tx.ID()
m.discardedTxs.Put(txID, tx.Tx)
log.Debug("failed to issue remote tx to mempool",
"txID", txID,
"err", err,
)
}

return err
}

// Add attempts to add [tx] to the mempool and returns an error if
// it could not be addeed to the mempool.
// AddTx attempts to add [tx] to the mempool and returns an error if
// it could not be added to the mempool.
func (m *Mempool) AddTx(tx *Tx) error {
m.lock.Lock()
defer m.lock.Unlock()

err := m.addTx(tx, false)
if errors.Is(err, errTxAlreadyKnown) {
return nil
}

if err != nil {
// unlike local txs, invalid remote txs are recorded as discarded
// so that they won't be requested again
Expand All @@ -169,15 +195,25 @@ func (m *Mempool) AddLocalTx(tx *Tx) error {
m.lock.Lock()
defer m.lock.Unlock()

return m.addTx(tx, false)
err := m.addTx(tx, false)
if errors.Is(err, errTxAlreadyKnown) {
return nil
}

return err
}

// forceAddTx forcibly adds a *Tx to the mempool and bypasses all verification.
func (m *Mempool) ForceAddTx(tx *Tx) error {
m.lock.Lock()
defer m.lock.Unlock()

return m.addTx(tx, true)
err := m.addTx(tx, true)
if errors.Is(err, errTxAlreadyKnown) {
return nil
}

return nil
}

// checkConflictTx checks for any transactions in the mempool that spend the same input UTXOs as [tx].
Expand Down Expand Up @@ -219,13 +255,13 @@ func (m *Mempool) addTx(tx *Tx, force bool) error {
// If [txID] has already been issued or is in the currentTxs map
// there's no need to add it.
if _, exists := m.issuedTxs[txID]; exists {
return nil
return fmt.Errorf("%w: tx %s was issued previously", errTxAlreadyKnown, tx.ID())
}
if _, exists := m.currentTxs[txID]; exists {
return nil
return fmt.Errorf("%w: tx %s is being built into a block", errTxAlreadyKnown, tx.ID())
}
if _, exists := m.txHeap.Get(txID); exists {
return nil
return fmt.Errorf("%w: tx %s is pending", errTxAlreadyKnown, tx.ID())
}
if !force && m.verify != nil {
if err := m.verify(tx); err != nil {
Expand Down
19 changes: 19 additions & 0 deletions plugin/evm/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,22 @@ func TestMempoolAddTx(t *testing.T) {
require.True(m.bloom.Has(tx))
}
}

// Add should return an error if a tx is already known
func TestMempoolAdd(t *testing.T) {
require := require.New(t)
m, err := NewMempool(&snow.Context{}, 5_000, nil)
require.NoError(err)

tx := &GossipAtomicTx{
Tx: &Tx{
UnsignedAtomicTx: &TestUnsignedTx{
IDV: ids.GenerateTestID(),
},
},
}

require.NoError(m.Add(tx))
err = m.Add(tx)
require.ErrorIs(err, errTxAlreadyKnown)
}
Loading

0 comments on commit dd2d956

Please # to comment.