diff --git a/snow/engine/snowman/issuer.go b/snow/engine/snowman/issuer.go index b3677d3cc21e..9af5fb9716ac 100644 --- a/snow/engine/snowman/issuer.go +++ b/snow/engine/snowman/issuer.go @@ -10,45 +10,30 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/consensus/snowman" - "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/snow/engine/snowman/job" ) +var _ job.Job[ids.ID] = (*issuer)(nil) + // issuer issues [blk] into to consensus after its dependencies are met. type issuer struct { t *Transitive nodeID ids.NodeID // nodeID of the peer that provided this block blk snowman.Block - issuedMetric prometheus.Counter - abandoned bool - deps set.Set[ids.ID] push bool + issuedMetric prometheus.Counter } -func (i *issuer) Dependencies() set.Set[ids.ID] { - return i.deps -} - -// Mark that a dependency has been met -func (i *issuer) Fulfill(ctx context.Context, id ids.ID) { - i.deps.Remove(id) - i.Update(ctx) -} - -// Abandon the attempt to issue [i.block] -func (i *issuer) Abandon(ctx context.Context, _ ids.ID) { - if !i.abandoned { - blkID := i.blk.ID() - i.t.removeFromPending(i.blk) - i.t.addToNonVerifieds(i.blk) - i.t.blocked.Abandon(ctx, blkID) +func (i *issuer) Execute(ctx context.Context, _ []ids.ID, abandoned []ids.ID) error { + if len(abandoned) == 0 { + // If the parent block wasn't abandoned, this block can be issued. + return i.t.deliver(ctx, i.nodeID, i.blk, i.push, i.issuedMetric) } - i.abandoned = true -} -func (i *issuer) Update(ctx context.Context) { - if i.abandoned || i.deps.Len() != 0 || i.t.errs.Errored() { - return - } - // Issue the block into consensus - i.t.errs.Add(i.t.deliver(ctx, i.nodeID, i.blk, i.push, i.issuedMetric)) + // If the parent block was abandoned, this block should be abandoned as + // well. + blkID := i.blk.ID() + i.t.removeFromPending(i.blk) + i.t.addToNonVerifieds(i.blk) + return i.t.blocked.Abandon(ctx, blkID) } diff --git a/snow/engine/snowman/job/scheduler.go b/snow/engine/snowman/job/scheduler.go new file mode 100644 index 000000000000..e05f27130dec --- /dev/null +++ b/snow/engine/snowman/job/scheduler.go @@ -0,0 +1,109 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +// Package job provides a Scheduler to manage and execute Jobs with +// dependencies. +package job + +import "context" + +// Job is a unit of work that can be executed based on the result of resolving +// requested dependencies. +type Job[T any] interface { + Execute(ctx context.Context, fulfilled []T, abandoned []T) error +} + +type job[T comparable] struct { + // Once all dependencies are resolved, the job will be executed. + numUnresolved int + fulfilled []T + abandoned []T + job Job[T] +} + +// Scheduler implements a dependency graph for jobs. Jobs can be registered with +// dependencies, and once all dependencies are resolved, the job will be +// executed. +type Scheduler[T comparable] struct { + // dependents maps a dependency to the jobs that depend on it. + dependents map[T][]*job[T] +} + +func NewScheduler[T comparable]() *Scheduler[T] { + return &Scheduler[T]{ + dependents: make(map[T][]*job[T]), + } +} + +// Schedule a job to be executed once all of its dependencies are resolved. If a +// job is scheduled with no dependencies, it's executed immediately. +// +// In order to prevent a memory leak, all dependencies must eventually either be +// fulfilled or abandoned. +// +// While registering a job with duplicate dependencies is discouraged, it is +// allowed. +func (s *Scheduler[T]) Schedule(ctx context.Context, userJob Job[T], dependencies ...T) error { + numUnresolved := len(dependencies) + if numUnresolved == 0 { + return userJob.Execute(ctx, nil, nil) + } + + j := &job[T]{ + numUnresolved: numUnresolved, + job: userJob, + } + for _, d := range dependencies { + s.dependents[d] = append(s.dependents[d], j) + } + return nil +} + +// NumDependencies returns the number of dependencies that jobs are currently +// blocking on. +func (s *Scheduler[_]) NumDependencies() int { + return len(s.dependents) +} + +// Fulfill a dependency. If all dependencies for a job are resolved, the job +// will be executed. +// +// It is safe to call the scheduler during the execution of a job. +func (s *Scheduler[T]) Fulfill(ctx context.Context, dependency T) error { + return s.resolveDependency(ctx, dependency, true) +} + +// Abandon a dependency. If all dependencies for a job are resolved, the job +// will be executed. +// +// It is safe to call the scheduler during the execution of a job. +func (s *Scheduler[T]) Abandon(ctx context.Context, dependency T) error { + return s.resolveDependency(ctx, dependency, false) +} + +func (s *Scheduler[T]) resolveDependency( + ctx context.Context, + dependency T, + fulfilled bool, +) error { + jobs := s.dependents[dependency] + delete(s.dependents, dependency) + + for _, job := range jobs { + job.numUnresolved-- + if fulfilled { + job.fulfilled = append(job.fulfilled, dependency) + } else { + job.abandoned = append(job.abandoned, dependency) + } + + if job.numUnresolved > 0 { + continue + } + + if err := job.job.Execute(ctx, job.fulfilled, job.abandoned); err != nil { + return err + } + } + return nil +} diff --git a/snow/engine/snowman/job/scheduler_test.go b/snow/engine/snowman/job/scheduler_test.go new file mode 100644 index 000000000000..db6502c5f74c --- /dev/null +++ b/snow/engine/snowman/job/scheduler_test.go @@ -0,0 +1,338 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package job + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" +) + +const ( + depToResolve = iota + depToNeglect +) + +var errDuplicateExecution = errors.New("job already executed") + +type testJob struct { + calledExecute bool + fulfilled []int + abandoned []int +} + +func (j *testJob) Execute(_ context.Context, fulfilled []int, abandoned []int) error { + if j.calledExecute { + return errDuplicateExecution + } + j.calledExecute = true + j.fulfilled = fulfilled + j.abandoned = abandoned + return nil +} + +func (j *testJob) reset() { + j.calledExecute = false + j.fulfilled = nil + j.abandoned = nil +} + +func newSchedulerWithJob[T comparable]( + t *testing.T, + job Job[T], + dependencies []T, + fulfilled []T, + abandoned []T, +) *Scheduler[T] { + s := NewScheduler[T]() + require.NoError(t, s.Schedule(context.Background(), job, dependencies...)) + for _, d := range fulfilled { + require.NoError(t, s.Fulfill(context.Background(), d)) + } + for _, d := range abandoned { + require.NoError(t, s.Abandon(context.Background(), d)) + } + return s +} + +func TestScheduler_Schedule(t *testing.T) { + userJob := &testJob{} + tests := []struct { + name string + scheduler *Scheduler[int] + dependencies []int + expectedExecuted bool + expectedNumDependencies int + expectedScheduler *Scheduler[int] + }{ + { + name: "no dependencies", + scheduler: NewScheduler[int](), + dependencies: nil, + expectedExecuted: true, + expectedNumDependencies: 0, + expectedScheduler: NewScheduler[int](), + }, + { + name: "one dependency", + scheduler: NewScheduler[int](), + dependencies: []int{depToResolve}, + expectedExecuted: false, + expectedNumDependencies: 1, + expectedScheduler: &Scheduler[int]{ + dependents: map[int][]*job[int]{ + depToResolve: { + { + numUnresolved: 1, + fulfilled: nil, + abandoned: nil, + job: userJob, + }, + }, + }, + }, + }, + { + name: "two dependencies", + scheduler: NewScheduler[int](), + dependencies: []int{depToResolve, depToNeglect}, + expectedExecuted: false, + expectedNumDependencies: 2, + expectedScheduler: &Scheduler[int]{ + dependents: map[int][]*job[int]{ + depToResolve: { + { + numUnresolved: 2, + fulfilled: nil, + abandoned: nil, + job: userJob, + }, + }, + depToNeglect: { + { + numUnresolved: 2, + fulfilled: nil, + abandoned: nil, + job: userJob, + }, + }, + }, + }, + }, + { + name: "additional dependency", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), + dependencies: []int{depToResolve}, + expectedExecuted: false, + expectedNumDependencies: 1, + expectedScheduler: &Scheduler[int]{ + dependents: map[int][]*job[int]{ + depToResolve: { + { + numUnresolved: 1, + fulfilled: nil, + abandoned: nil, + job: userJob, + }, + { + numUnresolved: 1, + fulfilled: nil, + abandoned: nil, + job: userJob, + }, + }, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + // Reset the variable between tests + userJob.reset() + + require.NoError(test.scheduler.Schedule(context.Background(), userJob, test.dependencies...)) + require.Equal(test.expectedNumDependencies, test.scheduler.NumDependencies()) + require.Equal(test.expectedExecuted, userJob.calledExecute) + require.Empty(userJob.fulfilled) + require.Empty(userJob.abandoned) + require.Equal(test.expectedScheduler, test.scheduler) + }) + } +} + +func TestScheduler_Fulfill(t *testing.T) { + userJob := &testJob{} + tests := []struct { + name string + scheduler *Scheduler[int] + expectedExecuted bool + expectedFulfilled []int + expectedAbandoned []int + expectedScheduler *Scheduler[int] + }{ + { + name: "no jobs", + scheduler: NewScheduler[int](), + expectedExecuted: false, + expectedFulfilled: nil, + expectedAbandoned: nil, + expectedScheduler: NewScheduler[int](), + }, + { + name: "single dependency", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), + expectedExecuted: true, + expectedFulfilled: []int{depToResolve}, + expectedAbandoned: nil, + expectedScheduler: NewScheduler[int](), + }, + { + name: "non-existent dependency", + scheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), + expectedExecuted: false, + expectedFulfilled: nil, + expectedAbandoned: nil, + expectedScheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), + }, + { + name: "incomplete dependencies", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, nil), + expectedExecuted: false, + expectedFulfilled: nil, + expectedAbandoned: nil, + expectedScheduler: &Scheduler[int]{ + dependents: map[int][]*job[int]{ + depToNeglect: { + { + numUnresolved: 1, + fulfilled: []int{depToResolve}, + abandoned: nil, + job: userJob, + }, + }, + }, + }, + }, + { + name: "duplicate dependency", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToResolve}, nil, nil), + expectedExecuted: true, + expectedFulfilled: []int{depToResolve, depToResolve}, + expectedAbandoned: nil, + expectedScheduler: NewScheduler[int](), + }, + { + name: "previously abandoned", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, []int{depToNeglect}), + expectedExecuted: true, + expectedFulfilled: []int{depToResolve}, + expectedAbandoned: []int{depToNeglect}, + expectedScheduler: NewScheduler[int](), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + // Reset the variable between tests + userJob.reset() + + require.NoError(test.scheduler.Fulfill(context.Background(), depToResolve)) + require.Equal(test.expectedExecuted, userJob.calledExecute) + require.Equal(test.expectedFulfilled, userJob.fulfilled) + require.Equal(test.expectedAbandoned, userJob.abandoned) + require.Equal(test.expectedScheduler, test.scheduler) + }) + } +} + +func TestScheduler_Abandon(t *testing.T) { + userJob := &testJob{} + tests := []struct { + name string + scheduler *Scheduler[int] + expectedExecuted bool + expectedFulfilled []int + expectedAbandoned []int + expectedScheduler *Scheduler[int] + }{ + { + name: "no jobs", + scheduler: NewScheduler[int](), + expectedExecuted: false, + expectedFulfilled: nil, + expectedAbandoned: nil, + expectedScheduler: NewScheduler[int](), + }, + { + name: "single dependency", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), + expectedExecuted: true, + expectedFulfilled: nil, + expectedAbandoned: []int{depToResolve}, + expectedScheduler: NewScheduler[int](), + }, + { + name: "non-existent dependency", + scheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), + expectedExecuted: false, + expectedFulfilled: nil, + expectedAbandoned: nil, + expectedScheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), + }, + { + name: "incomplete dependencies", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, nil), + expectedExecuted: false, + expectedFulfilled: nil, + expectedAbandoned: nil, + expectedScheduler: &Scheduler[int]{ + dependents: map[int][]*job[int]{ + depToNeglect: { + { + numUnresolved: 1, + fulfilled: nil, + abandoned: []int{depToResolve}, + job: userJob, + }, + }, + }, + }, + }, + { + name: "duplicate dependency", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToResolve}, nil, nil), + expectedExecuted: true, + expectedFulfilled: nil, + expectedAbandoned: []int{depToResolve, depToResolve}, + expectedScheduler: NewScheduler[int](), + }, + { + name: "previously fulfilled", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, []int{depToNeglect}, nil), + expectedExecuted: true, + expectedFulfilled: []int{depToNeglect}, + expectedAbandoned: []int{depToResolve}, + expectedScheduler: NewScheduler[int](), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + // Reset the variable between tests + userJob.reset() + + require.NoError(test.scheduler.Abandon(context.Background(), depToResolve)) + require.Equal(test.expectedExecuted, userJob.calledExecute) + require.Equal(test.expectedFulfilled, userJob.fulfilled) + require.Equal(test.expectedAbandoned, userJob.abandoned) + require.Equal(test.expectedScheduler, test.scheduler) + }) + } +} diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index f424ff993e99..680cb9c5e09c 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -21,7 +21,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/common/tracker" "github.com/ava-labs/avalanchego/snow/engine/snowman/ancestor" - "github.com/ava-labs/avalanchego/snow/event" + "github.com/ava-labs/avalanchego/snow/engine/snowman/job" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/bag" "github.com/ava-labs/avalanchego/utils/bimap" @@ -30,7 +30,6 @@ import ( "github.com/ava-labs/avalanchego/utils/math" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/utils/units" - "github.com/ava-labs/avalanchego/utils/wrappers" ) const nonVerifiedCacheSize = 64 * units.MiB @@ -83,14 +82,11 @@ type Transitive struct { // operations that are blocked on a block being issued. This could be // issuing another block, responding to a query, or applying votes to consensus - blocked event.Blocker + blocked *job.Scheduler[ids.ID] // number of times build block needs to be called once the number of // processing blocks has gone below the optimal number. pendingBuildBlocks int - - // errs tracks if an error has occurred in a callback - errs wrappers.Errs } func New(config Config) (*Transitive, error) { @@ -147,6 +143,7 @@ func New(config Config) (*Transitive, error) { nonVerifieds: ancestor.NewTree(), nonVerifiedCache: nonVerifiedCache, acceptedFrontiers: acceptedFrontiers, + blocked: job.NewScheduler[ids.ID](), polls: polls, blkReqs: bimap.New[common.Request, ids.ID](), blkReqSourceMetric: make(map[common.Request]prometheus.Counter), @@ -293,8 +290,11 @@ func (t *Transitive) GetFailed(ctx context.Context, nodeID ids.NodeID, requestID } delete(t.blkReqSourceMetric, req) - // Because the get request was dropped, we no longer expect blkID to be issued. - t.blocked.Abandon(ctx, blkID) + // Because the get request was dropped, we no longer expect blkID to be + // issued. + if err := t.blocked.Abandon(ctx, blkID); err != nil { + return err + } return t.executeDeferredWork(ctx) } @@ -391,21 +391,24 @@ func (t *Transitive) Chits(ctx context.Context, nodeID ids.NodeID, requestID uin // issued into consensus v := &voter{ t: t, - vdr: nodeID, + nodeID: nodeID, requestID: requestID, responseOptions: responseOptions, } // Wait until [preferredID] and [preferredIDAtHeight] have been issued to // consensus before applying this chit. + var deps []ids.ID if !addedPreferred { - v.deps.Add(preferredID) + deps = append(deps, preferredID) } if !addedPreferredIDAtHeight { - v.deps.Add(preferredIDAtHeight) + deps = append(deps, preferredIDAtHeight) } - t.blocked.Register(ctx, v) + if err := t.blocked.Schedule(ctx, v, deps...); err != nil { + return err + } return t.executeDeferredWork(ctx) } @@ -415,14 +418,14 @@ func (t *Transitive) QueryFailed(ctx context.Context, nodeID ids.NodeID, request return t.Chits(ctx, nodeID, requestID, lastAccepted, lastAccepted, lastAccepted) } - t.blocked.Register( - ctx, - &voter{ - t: t, - vdr: nodeID, - requestID: requestID, - }, - ) + v := &voter{ + t: t, + nodeID: nodeID, + requestID: requestID, + } + if err := t.blocked.Schedule(ctx, v); err != nil { + return err + } return t.executeDeferredWork(ctx) } @@ -534,7 +537,7 @@ func (t *Transitive) HealthCheck(ctx context.Context) (interface{}, error) { zap.Uint32("requestID", t.requestID), zap.Stringer("polls", t.polls), zap.Reflect("outstandingBlockRequests", t.blkReqs), - zap.Stringer("blockedJobs", &t.blocked), + zap.Int("numMissingDependencies", t.blocked.NumDependencies()), zap.Int("pendingBuildBlocks", t.pendingBuildBlocks), ) @@ -560,7 +563,7 @@ func (t *Transitive) executeDeferredWork(ctx context.Context) error { t.metrics.numRequests.Set(float64(t.blkReqs.Len())) t.metrics.numBlocked.Set(float64(len(t.pending))) - t.metrics.numBlockers.Set(float64(t.blocked.Len())) + t.metrics.numBlockers.Set(float64(t.blocked.NumDependencies())) t.metrics.numNonVerifieds.Set(float64(t.nonVerifieds.Len())) return nil } @@ -646,9 +649,6 @@ func (t *Transitive) sendChits(ctx context.Context, nodeID ids.NodeID, requestID // Build blocks if they have been requested and the number of processing blocks // is less than optimal. func (t *Transitive) buildBlocks(ctx context.Context) error { - if err := t.errs.Err; err != nil { - return err - } for t.pendingBuildBlocks > 0 && t.Consensus.NumProcessing() < t.Params.OptimalProcessing { t.pendingBuildBlocks-- @@ -758,14 +758,14 @@ func (t *Transitive) issueFrom( delete(t.blkReqSourceMetric, req) } - issued := t.isDecided(blk) || t.Consensus.Processing(blkID) - if issued { - // A dependency should never be waiting on a decided or processing - // block. However, if the block was marked as rejected by the VM, the - // dependencies may still be waiting. Therefore, they should abandoned. - t.blocked.Abandon(ctx, blkID) + if !t.isDecided(blk) && !t.Consensus.Processing(blkID) { + return false, nil } - return issued, t.errs.Err + + // A dependency should never be waiting on a decided or processing block. + // However, if the block was marked as rejected by the VM, the dependencies + // may still be waiting. Therefore, they should abandoned. + return true, t.blocked.Abandon(ctx, blkID) } // issueWithAncestors attempts to issue the branch ending with [blk] to consensus. @@ -806,8 +806,7 @@ func (t *Transitive) issueWithAncestors( // We don't have this block and have no reason to expect that we will get it. // Abandon the block to avoid a memory leak. - t.blocked.Abandon(ctx, blkID) - return false, t.errs.Err + return false, t.blocked.Abandon(ctx, blkID) } // If the block has been decided, then it is marked as having been issued. @@ -843,22 +842,24 @@ func (t *Transitive) issue( t: t, nodeID: nodeID, blk: blk, - issuedMetric: issuedMetric, push: push, + issuedMetric: issuedMetric, } // block on the parent if needed - parentID := blk.Parent() + var ( + parentID = blk.Parent() + deps []ids.ID + ) if parent, err := t.getBlock(ctx, parentID); err != nil || !(t.isDecided(parent) || t.Consensus.Processing(parentID)) { t.Ctx.Log.Verbo("block waiting for parent to be issued", zap.Stringer("blkID", blkID), zap.Stringer("parentID", parentID), ) - i.deps.Add(parentID) + deps = append(deps, parentID) } - t.blocked.Register(ctx, i) - return t.errs.Err + return t.blocked.Schedule(ctx, i, deps...) } // Request that [vdr] send us block [blkID] @@ -962,20 +963,18 @@ func (t *Transitive) deliver( // If [blk] is decided, then it shouldn't be added to consensus. // Similarly, if [blkID] is already in the processing set, it shouldn't // be added to consensus again. - t.blocked.Abandon(ctx, blkID) - return t.errs.Err + return t.blocked.Abandon(ctx, blkID) } parentID := blk.Parent() parent, err := t.getBlock(ctx, parentID) // Because the dependency must have been fulfilled by the time this function // is called - we don't expect [err] to be non-nil. But it is handled for - // completness and future proofing. + // completeness and future proofing. if err != nil || !(parent.Status() == choices.Accepted || t.Consensus.Processing(parentID)) { // if the parent isn't processing or the last accepted block, then this // block is effectively rejected - t.blocked.Abandon(ctx, blkID) - return t.errs.Err + return t.blocked.Abandon(ctx, blkID) } // By ensuring that the parent is either processing or accepted, it is @@ -986,8 +985,7 @@ func (t *Transitive) deliver( return err } if !blkAdded { - t.blocked.Abandon(ctx, blkID) - return t.errs.Err + return t.blocked.Abandon(ctx, blkID) } // Add all the oracle blocks if they exist. We call verify on all the blocks @@ -1026,7 +1024,9 @@ func (t *Transitive) deliver( t.sendQuery(ctx, blkID, blk.Bytes(), push) } - t.blocked.Fulfill(ctx, blkID) + if err := t.blocked.Fulfill(ctx, blkID); err != nil { + return err + } for _, blk := range added { blkID := blk.ID() if t.Consensus.IsPreferred(blkID) { @@ -1034,7 +1034,9 @@ func (t *Transitive) deliver( } t.removeFromPending(blk) - t.blocked.Fulfill(ctx, blkID) + if err := t.blocked.Fulfill(ctx, blkID); err != nil { + return err + } if req, ok := t.blkReqs.DeleteValue(blkID); ok { delete(t.blkReqSourceMetric, req) } @@ -1042,7 +1044,9 @@ func (t *Transitive) deliver( for _, blk := range dropped { blkID := blk.ID() t.removeFromPending(blk) - t.blocked.Abandon(ctx, blkID) + if err := t.blocked.Abandon(ctx, blkID); err != nil { + return err + } if req, ok := t.blkReqs.DeleteValue(blkID); ok { delete(t.blkReqSourceMetric, req) } @@ -1052,12 +1056,12 @@ func (t *Transitive) deliver( // immediately by votes that were pending their issuance. If this is the // case, we should not be requesting any chits. if t.Consensus.NumProcessing() == 0 { - return t.errs.Err + return nil } // If we should issue multiple queries at the same time, we need to repoll t.repoll(ctx) - return t.errs.Err + return nil } // Returns true if the block whose ID is [blkID] is waiting to be issued to consensus diff --git a/snow/engine/snowman/transitive_test.go b/snow/engine/snowman/transitive_test.go index 8c912df64a4a..18a1d320495e 100644 --- a/snow/engine/snowman/transitive_test.go +++ b/snow/engine/snowman/transitive_test.go @@ -170,7 +170,7 @@ func TestEngineDropsAttemptToIssueBlockAfterFailedRequest(t *testing.T) { // job blocked on [parent]'s issuance. require.NoError(engine.Put(context.Background(), peerID, 0, child.Bytes())) require.NotNil(request) - require.Len(engine.blocked, 1) + require.Equal(1, engine.blocked.NumDependencies()) vm.ParseBlockF = func(context.Context, []byte) (snowman.Block, error) { return nil, errUnknownBytes @@ -179,7 +179,7 @@ func TestEngineDropsAttemptToIssueBlockAfterFailedRequest(t *testing.T) { // Because this request doesn't provide [parent], the [child] job should be // cancelled. require.NoError(engine.Put(context.Background(), request.NodeID, request.RequestID, nil)) - require.Empty(engine.blocked) + require.Zero(engine.blocked.NumDependencies()) } func TestEngineQuery(t *testing.T) { @@ -315,7 +315,7 @@ func TestEngineQuery(t *testing.T) { require.NoError(engine.Put(context.Background(), getRequest.NodeID, getRequest.RequestID, child.Bytes())) require.Equal(choices.Accepted, parent.Status()) require.Equal(choices.Accepted, child.Status()) - require.Empty(engine.blocked) + require.Zero(engine.blocked.NumDependencies()) } func TestEngineMultipleQuery(t *testing.T) { @@ -461,7 +461,7 @@ func TestEngineMultipleQuery(t *testing.T) { require.NoError(te.Chits(context.Background(), vdr2, *queryRequestID, blk0.ID(), blk0.ID(), blk0.ID())) require.Equal(choices.Accepted, blk1.Status()) - require.Empty(te.blocked) + require.Zero(te.blocked.NumDependencies()) } func TestEngineBlockedIssue(t *testing.T) { @@ -879,12 +879,12 @@ func TestEngineAbandonChit(t *testing.T) { // Register a voter dependency on an unknown block. require.NoError(te.Chits(context.Background(), vdr, reqID, fakeBlkID, fakeBlkID, fakeBlkID)) - require.Len(te.blocked, 1) + require.Equal(1, te.blocked.NumDependencies()) sender.CantSendPullQuery = false require.NoError(te.GetFailed(context.Background(), vdr, reqID)) - require.Empty(te.blocked) + require.Zero(te.blocked.NumDependencies()) } func TestEngineAbandonChitWithUnexpectedPutBlock(t *testing.T) { @@ -932,7 +932,7 @@ func TestEngineAbandonChitWithUnexpectedPutBlock(t *testing.T) { // Register a voter dependency on an unknown block. require.NoError(te.Chits(context.Background(), vdr, reqID, fakeBlkID, fakeBlkID, fakeBlkID)) - require.Len(te.blocked, 1) + require.Equal(1, te.blocked.NumDependencies()) sender.CantSendPullQuery = false @@ -944,7 +944,7 @@ func TestEngineAbandonChitWithUnexpectedPutBlock(t *testing.T) { // Respond with an unexpected block and verify that the request is correctly // cleared. require.NoError(te.Put(context.Background(), vdr, reqID, snowmantest.GenesisBytes)) - require.Empty(te.blocked) + require.Zero(te.blocked.NumDependencies()) } func TestEngineBlockingChitRequest(t *testing.T) { @@ -988,7 +988,7 @@ func TestEngineBlockingChitRequest(t *testing.T) { require.NoError(te.PushQuery(context.Background(), vdr, 0, blockingBlk.Bytes(), 0)) - require.Len(te.blocked, 2) + require.Equal(2, te.blocked.NumDependencies()) sender.CantSendPullQuery = false @@ -1000,7 +1000,7 @@ func TestEngineBlockingChitRequest(t *testing.T) { te.metrics.issued.WithLabelValues(unknownSource), )) - require.Empty(te.blocked) + require.Zero(te.blocked.NumDependencies()) } func TestEngineBlockingChitResponse(t *testing.T) { @@ -1098,7 +1098,7 @@ func TestEngineBlockingChitResponse(t *testing.T) { missingBlk.ID(), blockingBlk.ID(), )) - require.Len(te.blocked, 2) + require.Equal(2, te.blocked.NumDependencies()) queryRequest = nil sender.SendPullQueryF = func(_ context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, blkID ids.ID, requestedHeight uint64) { @@ -2857,6 +2857,137 @@ func TestEngineVoteStallRegression(t *testing.T) { require.Equal(choices.Rejected, rejectedChain[0].Status()) } +// When a voter is registered with multiple dependencies, the engine must not +// execute the voter until all of the dependencies have been resolved; even if +// one of the dependencies has been abandoned. +func TestEngineEarlyTerminateVoterRegression(t *testing.T) { + require := require.New(t) + + config := DefaultConfig(t) + nodeID := ids.GenerateTestNodeID() + require.NoError(config.Validators.AddStaker(config.Ctx.SubnetID, nodeID, nil, ids.Empty, 1)) + + sender := &common.SenderTest{ + T: t, + SendChitsF: func(context.Context, ids.NodeID, uint32, ids.ID, ids.ID, ids.ID) {}, + } + sender.Default(true) + config.Sender = sender + + chain := snowmantest.BuildDescendants(snowmantest.Genesis, 3) + vm := &block.TestVM{ + TestVM: common.TestVM{ + T: t, + InitializeF: func( + context.Context, + *snow.Context, + database.Database, + []byte, + []byte, + []byte, + chan<- common.Message, + []*common.Fx, + common.AppSender, + ) error { + return nil + }, + SetStateF: func(context.Context, snow.State) error { + return nil + }, + }, + ParseBlockF: MakeParseBlockF( + []*snowmantest.Block{snowmantest.Genesis}, + chain, + ), + GetBlockF: MakeGetBlockF( + []*snowmantest.Block{snowmantest.Genesis}, + ), + SetPreferenceF: func(context.Context, ids.ID) error { + return nil + }, + LastAcceptedF: MakeLastAcceptedBlockF( + snowmantest.Genesis, + chain, + ), + } + vm.Default(true) + config.VM = vm + + engine, err := New(config) + require.NoError(err) + require.NoError(engine.Start(context.Background(), 0)) + + var pollRequestIDs []uint32 + sender.SendPullQueryF = func(_ context.Context, polledNodeIDs set.Set[ids.NodeID], requestID uint32, _ ids.ID, _ uint64) { + require.Equal(set.Of(nodeID), polledNodeIDs) + pollRequestIDs = append(pollRequestIDs, requestID) + } + + getRequestIDs := make(map[ids.ID]uint32) + sender.SendGetF = func(_ context.Context, requestedNodeID ids.NodeID, requestID uint32, blkID ids.ID) { + require.Equal(nodeID, requestedNodeID) + getRequestIDs[blkID] = requestID + } + + // Issue block 0 to trigger poll 0. + require.NoError(engine.PushQuery( + context.Background(), + nodeID, + 0, + chain[0].Bytes(), + 0, + )) + require.Len(pollRequestIDs, 1) + require.Empty(getRequestIDs) + + // Update GetBlock to return, the newly issued, block 0. This is needed to + // enable the issuance of block 1. + vm.GetBlockF = MakeGetBlockF( + []*snowmantest.Block{snowmantest.Genesis}, + chain[:1], + ) + + // Vote for block 2 or block 1 in poll 0. This should trigger Get requests + // for both block 2 and block 1. + require.NoError(engine.Chits( + context.Background(), + nodeID, + pollRequestIDs[0], + chain[2].ID(), + chain[1].ID(), + snowmantest.GenesisID, + )) + require.Len(pollRequestIDs, 1) + require.Contains(getRequestIDs, chain[1].ID()) + require.Contains(getRequestIDs, chain[2].ID()) + + // Mark the request for block 2 as failed. This should not cause the poll to + // be applied as there is still an outstanding request for block 1. + require.NoError(engine.GetFailed( + context.Background(), + nodeID, + getRequestIDs[chain[2].ID()], + )) + require.Len(pollRequestIDs, 1) + + // Issue block 1. This should cause the poll to be applied to both block 0 + // and block 1. + require.NoError(engine.Put( + context.Background(), + nodeID, + getRequestIDs[chain[1].ID()], + chain[1].Bytes(), + )) + // Because Put added a new preferred block to the chain, a new poll will be + // created. + require.Len(pollRequestIDs, 2) + require.Equal(choices.Accepted, chain[0].Status()) + require.Equal(choices.Accepted, chain[1].Status()) + // Block 2 still hasn't been issued, so it's status should remain + // Processing. + require.Equal(choices.Processing, chain[2].Status()) +} + func TestGetProcessingAncestor(t *testing.T) { var ( ctx = snowtest.ConsensusContext( diff --git a/snow/engine/snowman/voter.go b/snow/engine/snowman/voter.go index f987faf2aac8..c57a1c733551 100644 --- a/snow/engine/snowman/voter.go +++ b/snow/engine/snowman/voter.go @@ -9,39 +9,25 @@ import ( "go.uber.org/zap" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/engine/snowman/job" "github.com/ava-labs/avalanchego/utils/bag" - "github.com/ava-labs/avalanchego/utils/set" ) -// Voter records chits received from [vdr] once its dependencies are met. +var _ job.Job[ids.ID] = (*voter)(nil) + +// Voter records chits received from [nodeID] once its dependencies are met. type voter struct { t *Transitive - vdr ids.NodeID + nodeID ids.NodeID requestID uint32 responseOptions []ids.ID - deps set.Set[ids.ID] -} - -func (v *voter) Dependencies() set.Set[ids.ID] { - return v.deps } -// Mark that a dependency has been met. -func (v *voter) Fulfill(ctx context.Context, id ids.ID) { - v.deps.Remove(id) - v.Update(ctx) -} - -// Abandon this attempt to record chits. -func (v *voter) Abandon(ctx context.Context, id ids.ID) { - v.Fulfill(ctx, id) -} - -func (v *voter) Update(ctx context.Context) { - if v.deps.Len() != 0 || v.t.errs.Errored() { - return - } - +// The resolution results from the dependencies of the voter aren't explicitly +// used. The responseOptions are used to determine which block to apply the vote +// to. The dependencies are only used to optimistically delay the application of +// the vote until the blocks have been issued. +func (v *voter) Execute(ctx context.Context, _ []ids.ID, _ []ids.ID) error { var ( vote ids.ID shouldVote bool @@ -60,13 +46,13 @@ func (v *voter) Update(ctx context.Context) { var results []bag.Bag[ids.ID] if shouldVote { v.t.selectedVoteIndex.Observe(float64(voteIndex)) - results = v.t.polls.Vote(v.requestID, v.vdr, vote) + results = v.t.polls.Vote(v.requestID, v.nodeID, vote) } else { - results = v.t.polls.Drop(v.requestID, v.vdr) + results = v.t.polls.Drop(v.requestID, v.nodeID) } if len(results) == 0 { - return + return nil } for _, result := range results { @@ -75,24 +61,20 @@ func (v *voter) Update(ctx context.Context) { zap.Stringer("result", &result), ) if err := v.t.Consensus.RecordPoll(ctx, result); err != nil { - v.t.errs.Add(err) + return err } } - if v.t.errs.Errored() { - return - } - if err := v.t.VM.SetPreference(ctx, v.t.Consensus.Preference()); err != nil { - v.t.errs.Add(err) - return + return err } if v.t.Consensus.NumProcessing() == 0 { v.t.Ctx.Log.Debug("Snowman engine can quiesce") - return + return nil } v.t.Ctx.Log.Debug("Snowman engine can't quiesce") v.t.repoll(ctx) + return nil } diff --git a/snow/event/blockable.go b/snow/event/blockable.go deleted file mode 100644 index 404e95c2aee3..000000000000 --- a/snow/event/blockable.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package event - -import ( - "context" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/utils/set" -) - -// Blockable defines what an object must implement to be able to block on -// dependent events being completed. -type Blockable interface { - // IDs that this object is blocking on - Dependencies() set.Set[ids.ID] - // Notify this object that an event has been fulfilled - Fulfill(context.Context, ids.ID) - // Notify this object that an event has been abandoned - Abandon(context.Context, ids.ID) - // Update the state of this object without changing the status of any events - Update(context.Context) -} diff --git a/snow/event/blocker.go b/snow/event/blocker.go deleted file mode 100644 index 9c15ffb50604..000000000000 --- a/snow/event/blocker.go +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package event - -import ( - "context" - "fmt" - "strings" - - "github.com/ava-labs/avalanchego/ids" -) - -const ( - minBlockerSize = 16 -) - -// Blocker tracks Blockable events. -// Blocker is used to track events that require their dependencies to be -// fulfilled before them. Once a Blockable event is registered, it will be -// notified once any of its dependencies are fulfilled or abandoned. -type Blocker map[ids.ID][]Blockable - -func (b *Blocker) init() { - if *b == nil { - *b = make(map[ids.ID][]Blockable, minBlockerSize) - } -} - -// Returns the number of items that have dependencies waiting on -// them to be fulfilled -func (b *Blocker) Len() int { - return len(*b) -} - -// Fulfill notifies all objects blocking on the event whose ID is that -// the event has happened -func (b *Blocker) Fulfill(ctx context.Context, id ids.ID) { - b.init() - - blocking := (*b)[id] - delete(*b, id) - - for _, pending := range blocking { - pending.Fulfill(ctx, id) - } -} - -// Abandon notifies all objects blocking on the event whose ID is that -// the event has been abandoned -func (b *Blocker) Abandon(ctx context.Context, id ids.ID) { - b.init() - - blocking := (*b)[id] - delete(*b, id) - - for _, pending := range blocking { - pending.Abandon(ctx, id) - } -} - -// Register a new Blockable and its dependencies -func (b *Blocker) Register(ctx context.Context, pending Blockable) { - b.init() - - for pendingID := range pending.Dependencies() { - (*b)[pendingID] = append((*b)[pendingID], pending) - } - - pending.Update(ctx) -} - -// PrefixedString returns the same value as the String function, with all the -// new lines prefixed by [prefix] -func (b *Blocker) PrefixedString(prefix string) string { - b.init() - - sb := strings.Builder{} - sb.WriteString(fmt.Sprintf("Blocking on %d IDs:", len(*b))) - for key, value := range *b { - sb.WriteString(fmt.Sprintf("\n%sID[%s]: %d", - prefix, - key, - len(value), - )) - } - return strings.TrimSuffix(sb.String(), "\n") -} - -func (b *Blocker) String() string { - return b.PrefixedString("") -} diff --git a/snow/event/blocker_test.go b/snow/event/blocker_test.go deleted file mode 100644 index d7620bfebe1a..000000000000 --- a/snow/event/blocker_test.go +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package event - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/utils/set" -) - -func TestBlocker(t *testing.T) { - require := require.New(t) - - b := Blocker(nil) - - a := newTestBlockable() - - id0 := ids.GenerateTestID() - id1 := ids.GenerateTestID() - id2 := ids.GenerateTestID() - - calledDep := new(bool) - a.dependencies = func() set.Set[ids.ID] { - *calledDep = true - - s := set.Of(id0, id1) - return s - } - calledFill := new(bool) - a.fulfill = func(context.Context, ids.ID) { - *calledFill = true - } - calledAbandon := new(bool) - a.abandon = func(context.Context, ids.ID) { - *calledAbandon = true - } - calledUpdate := new(bool) - a.update = func(context.Context) { - *calledUpdate = true - } - - b.Register(context.Background(), a) - - require.True(*calledDep) - require.False(*calledFill) - require.False(*calledAbandon) - require.True(*calledUpdate) - - b.Fulfill(context.Background(), id2) - b.Abandon(context.Background(), id2) - - require.True(*calledDep) - require.False(*calledFill) - require.False(*calledAbandon) - require.True(*calledUpdate) - - b.Fulfill(context.Background(), id0) - - require.True(*calledDep) - require.True(*calledFill) - require.False(*calledAbandon) - require.True(*calledUpdate) - - b.Abandon(context.Background(), id0) - - require.True(*calledDep) - require.True(*calledFill) - require.False(*calledAbandon) - require.True(*calledUpdate) - - b.Abandon(context.Background(), id1) - - require.True(*calledDep) - require.True(*calledFill) - require.True(*calledAbandon) - require.True(*calledUpdate) -} - -type testBlockable struct { - dependencies func() set.Set[ids.ID] - fulfill func(context.Context, ids.ID) - abandon func(context.Context, ids.ID) - update func(context.Context) -} - -func newTestBlockable() *testBlockable { - return &testBlockable{ - dependencies: func() set.Set[ids.ID] { - return set.Set[ids.ID]{} - }, - fulfill: func(context.Context, ids.ID) {}, - abandon: func(context.Context, ids.ID) {}, - update: func(context.Context) {}, - } -} - -func (b *testBlockable) Dependencies() set.Set[ids.ID] { - return b.dependencies() -} - -func (b *testBlockable) Fulfill(ctx context.Context, id ids.ID) { - b.fulfill(ctx, id) -} - -func (b *testBlockable) Abandon(ctx context.Context, id ids.ID) { - b.abandon(ctx, id) -} - -func (b *testBlockable) Update(ctx context.Context) { - b.update(ctx) -}