Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

remove cross-chain handlers #628

Merged
merged 7 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21.12

require (
github.com/VictoriaMetrics/fastcache v1.12.1
github.com/ava-labs/avalanchego v1.11.11-0.20240813180138-7520071656af
github.com/ava-labs/avalanchego v1.11.11-0.20240813203340-ab83fb41528d
github.com/cespare/cp v0.1.0
github.com/crate-crypto/go-ipa v0.0.0-20231025140028-3c0104f4b233
github.com/davecgh/go-spew v1.1.1
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/avalanchego v1.11.11-0.20240809105844-da3256302ab1 h1:h/09vEeKWELnlHr2Da2cXawD8Qg/9RyHy5SrODt0kkU=
github.com/ava-labs/avalanchego v1.11.11-0.20240809105844-da3256302ab1/go.mod h1:9e0UPXJboybmgFjeTj+SFbK4ugbrdG4t68VdiUW5oQ8=
github.com/ava-labs/avalanchego v1.11.11-0.20240813180138-7520071656af h1:BpAUJHH+QerXi2dMaHbaOzZHCiQAdqS+75Bqzd1alt8=
github.com/ava-labs/avalanchego v1.11.11-0.20240813180138-7520071656af/go.mod h1:8pnf2At/q0LRq5dvYJYn3CkhKzZNHRd5pjARC9psu+g=
github.com/ava-labs/avalanchego v1.11.11-0.20240813203340-ab83fb41528d h1:LyrKJL9avIIxBY3uTcS2dFtUMBFmI2QpAgG6qYTdA6s=
github.com/ava-labs/avalanchego v1.11.11-0.20240813203340-ab83fb41528d/go.mod h1:UkyrRDXK2E15Lq2abyae2Pt+JsWvgsg1pe0/AtoMyAM=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down
14 changes: 0 additions & 14 deletions peer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ type NetworkClient interface {
// Returns response bytes, and ErrRequestFailed if the request should be retried.
SendAppRequest(ctx context.Context, nodeID ids.NodeID, request []byte) ([]byte, error)

// SendCrossChainRequest sends a request to a specific blockchain running on this node.
// Returns response bytes, and ErrRequestFailed if the request failed.
SendCrossChainRequest(ctx context.Context, chainID ids.ID, request []byte) ([]byte, error)

// TrackBandwidth should be called for each valid request with the bandwidth
// (length of response divided by request time), and with 0 if the response is invalid.
TrackBandwidth(nodeID ids.NodeID, bandwidth float64)
Expand Down Expand Up @@ -77,16 +73,6 @@ func (c *client) SendAppRequest(ctx context.Context, nodeID ids.NodeID, request
return waitingHandler.WaitForResult(ctx)
}

// SendCrossChainRequest synchronously sends request to the specified chainID
// Returns response bytes and ErrRequestFailed if the request should be retried.
func (c *client) SendCrossChainRequest(ctx context.Context, chainID ids.ID, request []byte) ([]byte, error) {
waitingHandler := newWaitingResponseHandler()
if err := c.network.SendCrossChainRequest(ctx, chainID, request, waitingHandler); err != nil {
return nil, err
}
return waitingHandler.WaitForResult(ctx)
}

func (c *client) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) {
c.network.TrackBandwidth(nodeID, bandwidth)
}
172 changes: 8 additions & 164 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ type Network interface {
// SendAppRequest sends message to given nodeID, notifying handler when there's a response or timeout
SendAppRequest(ctx context.Context, nodeID ids.NodeID, message []byte, handler message.ResponseHandler) error

// SendCrossChainRequest sends a message to given chainID notifying handler when there's a response or timeout
SendCrossChainRequest(ctx context.Context, chainID ids.ID, message []byte, handler message.ResponseHandler) error

// Shutdown stops all peer channel listeners and marks the node to have stopped
// n.Start() can be called again but the peers will have to be reconnected
// by calling OnPeerConnected for each peer
Expand All @@ -65,9 +62,6 @@ type Network interface {
// SetRequestHandler sets the provided request handler as the request handler
SetRequestHandler(handler message.RequestHandler)

// SetCrossChainHandler sets the provided cross chain request handler as the cross chain request handler
SetCrossChainRequestHandler(handler message.CrossChainRequestHandler)

// Size returns the size of the network in number of connected peers
Size() uint32

Expand All @@ -89,17 +83,13 @@ type network struct {
requestIDGen uint32 // requestID counter used to track outbound requests
outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler
activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests
activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests
p2pNetwork *p2p.Network
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
appRequestHandler message.RequestHandler // maps request type => handler
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
appRequestHandler message.RequestHandler // maps request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics

// Set to true when Shutdown is called, after which all operations on this
// struct are no-ops.
Expand All @@ -112,22 +102,18 @@ type network struct {
closed utils.Atomic[bool]
}

func NewNetwork(p2pNetwork *p2p.Network, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
func NewNetwork(p2pNetwork *p2p.Network, appSender common.AppSender, codec codec.Manager, self ids.NodeID, maxActiveAppRequests int64) Network {
return &network{
appSender: appSender,
codec: codec,
crossChainCodec: crossChainCodec,
self: self,
outstandingRequestHandlers: make(map[uint32]message.ResponseHandler),
activeAppRequests: semaphore.NewWeighted(maxActiveAppRequests),
activeCrossChainRequests: semaphore.NewWeighted(maxActiveCrossChainRequests),
p2pNetwork: p2pNetwork,
gossipHandler: message.NoopMempoolGossipHandler{},
appRequestHandler: message.NoopRequestHandler{},
crossChainRequestHandler: message.NoopCrossChainRequestHandler{},
peers: NewPeerTracker(),
appStats: stats.NewRequestHandlerStats(),
crossChainStats: stats.NewCrossChainRequestHandlerStats(),
}
}

Expand Down Expand Up @@ -225,141 +211,6 @@ func (n *network) sendAppRequest(ctx context.Context, nodeID ids.NodeID, request
return nil
}

// SendCrossChainRequest sends request message bytes to specified chainID and adds [handler] to [outstandingRequestHandlers]
// so that it can be invoked when the network receives either a response or failure message.
// Returns an error if [appSender] is unable to make the request.
func (n *network) SendCrossChainRequest(ctx context.Context, chainID ids.ID, request []byte, handler message.ResponseHandler) error {
// Take a slot from total [activeCrossChainRequests] and block until a slot becomes available.
if err := n.activeCrossChainRequests.Acquire(ctx, 1); err != nil {
return errAcquiringSemaphore
}

n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
n.activeCrossChainRequests.Release(1)
return nil
}

// If the context was cancelled, we can skip sending this request.
if err := ctx.Err(); err != nil {
n.activeCrossChainRequests.Release(1)
return err
}

requestID := n.nextRequestID()
n.outstandingRequestHandlers[requestID] = handler

// Send cross chain request to [chainID].
// On failure, release the slot from [activeCrossChainRequests] and delete
// request from [outstandingRequestHandlers].
//
// Cancellation is removed from this context to avoid erroring unexpectedly.
// SendCrossChainAppRequest should be non-blocking and any error other than
// context cancellation is unexpected.
//
// This guarantees that the network should never receive an unexpected
// CrossChainAppResponse.
ctxWithoutCancel := context.WithoutCancel(ctx)
if err := n.appSender.SendCrossChainAppRequest(ctxWithoutCancel, chainID, requestID, request); err != nil {
log.Error(
"request to chain failed",
"chainID", chainID,
"requestID", requestID,
"requestLen", len(request),
"error", err,
)

n.activeCrossChainRequests.Release(1)
delete(n.outstandingRequestHandlers, requestID)
return err
}

log.Debug("sent request message to chain", "chainID", chainID, "crossChainRequestID", requestID)
return nil
}

// CrossChainAppRequest notifies the VM when another chain in the network requests for data.
// Send a CrossChainAppResponse to [chainID] in response to a valid message using the same
// [requestID] before the deadline.
func (n *network) CrossChainAppRequest(ctx context.Context, requestingChainID ids.ID, requestID uint32, deadline time.Time, request []byte) error {
if n.closed.Get() {
return nil
}

log.Debug("received CrossChainAppRequest from chain", "requestingChainID", requestingChainID, "requestID", requestID, "requestLen", len(request))

var req message.CrossChainRequest
if _, err := n.crossChainCodec.Unmarshal(request, &req); err != nil {
log.Debug("failed to unmarshal CrossChainAppRequest", "requestingChainID", requestingChainID, "requestID", requestID, "requestLen", len(request), "err", err)
return nil
}

bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.crossChainStats)
if err != nil {
log.Debug("deadline to process CrossChainAppRequest has expired, skipping", "requestingChainID", requestingChainID, "requestID", requestID, "err", err)
return nil
}

log.Debug("processing incoming CrossChainAppRequest", "requestingChainID", requestingChainID, "requestID", requestID, "req", req)
handleCtx, cancel := context.WithDeadline(context.Background(), bufferedDeadline)
defer cancel()

responseBytes, err := req.Handle(handleCtx, requestingChainID, requestID, n.crossChainRequestHandler)
switch {
case err != nil && err != context.DeadlineExceeded:
return err // Return a fatal error
case responseBytes != nil:
return n.appSender.SendCrossChainAppResponse(ctx, requestingChainID, requestID, responseBytes) // Propagate fatal error
default:
return nil
}
}

// CrossChainAppRequestFailed can be called by the avalanchego -> VM in following cases:
// - respondingChain doesn't exist
// - invalid CrossChainAppResponse from respondingChain
// - invalid CrossChainRequest was sent to respondingChain
// - request times out before a response is provided
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) CrossChainAppRequestFailed(ctx context.Context, respondingChainID ids.ID, requestID uint32, _ *common.AppError) error {
log.Debug("received CrossChainAppRequestFailed from chain", "respondingChainID", respondingChainID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Can happen after the network has been closed.
log.Debug("received CrossChainAppRequestFailed to unknown request", "respondingChainID", respondingChainID, "requestID", requestID)
return nil
}

// We must release the slot
n.activeCrossChainRequests.Release(1)

return handler.OnFailure()
}

// CrossChainAppResponse is invoked when there is a
// response received from [respondingChainID] regarding a request the VM sent out
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) CrossChainAppResponse(ctx context.Context, respondingChainID ids.ID, requestID uint32, response []byte) error {
log.Debug("received CrossChainAppResponse from responding chain", "respondingChainID", respondingChainID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Can happen after the network has been closed.
log.Debug("received CrossChainAppResponse to unknown request", "respondingChainID", respondingChainID, "requestID", requestID, "responseLen", len(response))
return nil
}

// We must release the slot
n.activeCrossChainRequests.Release(1)

return handler.OnResponse(response)
}

// AppRequest is called by avalanchego -> VM when there is an incoming AppRequest from a peer
// error returned by this function is expected to be treated as fatal by the engine
// returns error if the requestHandler returns an error
Expand Down Expand Up @@ -443,7 +294,7 @@ func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, reque

// calculateTimeUntilDeadline calculates the time until deadline and drops it if we missed he deadline to response.
// This function updates metrics for both app requests and cross chain requests.
// This is called by either [AppRequest] or [CrossChainAppRequest].
// This is called by [AppRequest].
func calculateTimeUntilDeadline(deadline time.Time, stats stats.RequestHandlerStats) (time.Time, error) {
// calculate how much time is left until the deadline
timeTillDeadline := time.Until(deadline)
Expand Down Expand Up @@ -560,13 +411,6 @@ func (n *network) SetRequestHandler(handler message.RequestHandler) {
n.appRequestHandler = handler
}

func (n *network) SetCrossChainRequestHandler(handler message.CrossChainRequestHandler) {
n.lock.Lock()
defer n.lock.Unlock()

n.crossChainRequestHandler = handler
}

func (n *network) Size() uint32 {
n.lock.RLock()
defer n.lock.RUnlock()
Expand Down
Loading
Loading