From 42502cef85eb2573af8cf24668ab5148026b4f87 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 18 Jun 2024 14:27:27 -0400 Subject: [PATCH 1/3] Remove job cancellation --- snow/engine/snowman/issuer.go | 13 +- snow/engine/snowman/job/scheduler.go | 83 ++++----- snow/engine/snowman/job/scheduler_test.go | 197 +++++++++++++++------- snow/engine/snowman/voter.go | 12 +- 4 files changed, 183 insertions(+), 122 deletions(-) diff --git a/snow/engine/snowman/issuer.go b/snow/engine/snowman/issuer.go index efb23a90b5e9..9af5fb9716ac 100644 --- a/snow/engine/snowman/issuer.go +++ b/snow/engine/snowman/issuer.go @@ -13,7 +13,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/snowman/job" ) -var _ job.Job = (*issuer)(nil) +var _ job.Job[ids.ID] = (*issuer)(nil) // issuer issues [blk] into to consensus after its dependencies are met. type issuer struct { @@ -24,11 +24,14 @@ type issuer struct { issuedMetric prometheus.Counter } -func (i *issuer) Execute(ctx context.Context) error { - return i.t.deliver(ctx, i.nodeID, i.blk, i.push, i.issuedMetric) -} +func (i *issuer) Execute(ctx context.Context, _ []ids.ID, abandoned []ids.ID) error { + if len(abandoned) == 0 { + // If the parent block wasn't abandoned, this block can be issued. + return i.t.deliver(ctx, i.nodeID, i.blk, i.push, i.issuedMetric) + } -func (i *issuer) Cancel(ctx context.Context) error { + // If the parent block was abandoned, this block should be abandoned as + // well. blkID := i.blk.ID() i.t.removeFromPending(i.blk) i.t.addToNonVerifieds(i.blk) diff --git a/snow/engine/snowman/job/scheduler.go b/snow/engine/snowman/job/scheduler.go index 163f51c2c2d9..a40a80b9e01a 100644 --- a/snow/engine/snowman/job/scheduler.go +++ b/snow/engine/snowman/job/scheduler.go @@ -7,29 +7,25 @@ package job import ( "context" - - "github.com/ava-labs/avalanchego/utils/set" ) -// Job is a unit of work that can be executed or cancelled. -type Job interface { - Execute(context.Context) error - Cancel(context.Context) error +// Job is a unit of work that can be executed based on the result of resolving +// requested dependencies. +type Job[T any] interface { + Execute(ctx context.Context, fulfilled []T, abandoned []T) error } type job[T comparable] struct { - // If empty, the job is ready to be executed. - dependencies set.Set[T] - // If true, the job should be cancelled. - shouldCancel bool - // If nil, the job has already been executed or cancelled. - job Job + // Once all dependencies are resolved, the job will be executed. + numUnresolved int + fulfilled []T + abandoned []T + job Job[T] } // Scheduler implements a dependency graph for jobs. Jobs can be registered with -// dependencies, and once all dependencies are fulfilled, the job will be -// executed. If any of the dependencies are abandoned, the job will be -// cancelled. +// dependencies, and once all dependencies are resolved, the job will be +// executed. type Scheduler[T comparable] struct { // dependents maps a dependency to the jobs that depend on it. dependents map[T][]*job[T] @@ -41,21 +37,22 @@ func NewScheduler[T comparable]() *Scheduler[T] { } } -// Register a job that should be executed once all of its dependencies are -// fulfilled. In order to prevent a memory leak, all dependencies must -// eventually either be fulfilled or abandoned. +// Register a job to be executed once all of its dependencies are resolved. +// +// In order to prevent a memory leak, all dependencies must eventually either be +// fulfilled or abandoned. // // While registering a job with duplicate dependencies is discouraged, it is -// allowed and treated similarly to registering the job with the dependencies -// de-duplicated. -func (s *Scheduler[T]) Register(ctx context.Context, userJob Job, dependencies ...T) error { +// allowed. +func (s *Scheduler[T]) Register(ctx context.Context, userJob Job[T], dependencies ...T) error { + numUnresolved := len(dependencies) if len(dependencies) == 0 { - return userJob.Execute(ctx) + return userJob.Execute(ctx, nil, nil) } j := &job[T]{ - dependencies: set.Of(dependencies...), - job: userJob, + numUnresolved: numUnresolved, + job: userJob, } for _, d := range dependencies { s.dependents[d] = append(s.dependents[d], j) @@ -69,51 +66,43 @@ func (s *Scheduler[_]) NumDependencies() int { return len(s.dependents) } -// Fulfill a dependency. If all dependencies for a job are fulfilled, the job +// Fulfill a dependency. If all dependencies for a job are resolved, the job // will be executed. // // It is safe to call the scheduler during the execution of a job. func (s *Scheduler[T]) Fulfill(ctx context.Context, dependency T) error { - return s.resolveDependency(ctx, dependency, false) + return s.resolveDependency(ctx, dependency, true) } -// Abandon a dependency. If any dependencies for a job are abandoned, the job -// will be cancelled. The job will only be cancelled once all dependencies are -// resolved. +// Abandon a dependency. If all dependencies for a job are resolved, the job +// will be executed. // -// It is safe to call the scheduler during the cancelling of a job. +// It is safe to call the scheduler during the execution of a job. func (s *Scheduler[T]) Abandon(ctx context.Context, dependency T) error { - return s.resolveDependency(ctx, dependency, true) + return s.resolveDependency(ctx, dependency, false) } func (s *Scheduler[T]) resolveDependency( ctx context.Context, dependency T, - shouldCancel bool, + fulfilled bool, ) error { jobs := s.dependents[dependency] delete(s.dependents, dependency) for _, job := range jobs { - job.dependencies.Remove(dependency) - job.shouldCancel = shouldCancel || job.shouldCancel + job.numUnresolved-- + if fulfilled { + job.fulfilled = append(job.fulfilled, dependency) + } else { + job.abandoned = append(job.abandoned, dependency) + } - userJob := job.job - if userJob == nil || job.dependencies.Len() != 0 { + if job.numUnresolved > 0 { continue } - // Mark the job as handled so that any reentrant calls do not interact - // with this job again. - job.job = nil - - var err error - if job.shouldCancel { - err = userJob.Cancel(ctx) - } else { - err = userJob.Execute(ctx) - } - if err != nil { + if err := job.job.Execute(ctx, job.fulfilled, job.abandoned); err != nil { return err } } diff --git a/snow/engine/snowman/job/scheduler_test.go b/snow/engine/snowman/job/scheduler_test.go index 9e5169fd0b83..32844e9382f8 100644 --- a/snow/engine/snowman/job/scheduler_test.go +++ b/snow/engine/snowman/job/scheduler_test.go @@ -9,8 +9,6 @@ import ( "testing" "github.com/stretchr/testify/require" - - "github.com/ava-labs/avalanchego/utils/set" ) const ( @@ -18,48 +16,44 @@ const ( depToNeglect ) -var errDuplicateInvocation = errors.New("job already handled") +var errDuplicateExecution = errors.New("job already executed") type testJob struct { calledExecute bool - calledCancel bool + fulfilled []int + abandoned []int } -func (j *testJob) Execute(context.Context) error { +func (j *testJob) Execute(_ context.Context, fulfilled []int, abandoned []int) error { if j.calledExecute { - return errDuplicateInvocation + return errDuplicateExecution } j.calledExecute = true - return nil -} - -func (j *testJob) Cancel(context.Context) error { - if j.calledCancel { - return errDuplicateInvocation - } - j.calledCancel = true + j.fulfilled = fulfilled + j.abandoned = abandoned return nil } func (j *testJob) reset() { j.calledExecute = false - j.calledCancel = false + j.fulfilled = nil + j.abandoned = nil } func newSchedulerWithJob[T comparable]( t *testing.T, - job Job, - shouldCancel bool, - dependencies ...T, + job Job[T], + dependencies []T, + fulfilled []T, + abandoned []T, ) *Scheduler[T] { q := NewScheduler[T]() require.NoError(t, q.Register(context.Background(), job, dependencies...)) - if shouldCancel { - for _, jobs := range q.dependents { - for _, j := range jobs { - j.shouldCancel = true - } - } + for _, d := range fulfilled { + require.NoError(t, q.Fulfill(context.Background(), d)) + } + for _, d := range abandoned { + require.NoError(t, q.Abandon(context.Background(), d)) } return q } @@ -92,8 +86,10 @@ func TestScheduler_Register(t *testing.T) { dependents: map[int][]*job[int]{ depToResolve: { { - dependencies: set.Of(depToResolve), - job: userJob, + numUnresolved: 1, + fulfilled: nil, + abandoned: nil, + job: userJob, }, }, }, @@ -109,14 +105,18 @@ func TestScheduler_Register(t *testing.T) { dependents: map[int][]*job[int]{ depToResolve: { { - dependencies: set.Of(depToResolve, depToNeglect), - job: userJob, + numUnresolved: 2, + fulfilled: nil, + abandoned: nil, + job: userJob, }, }, depToNeglect: { { - dependencies: set.Of(depToResolve, depToNeglect), - job: userJob, + numUnresolved: 2, + fulfilled: nil, + abandoned: nil, + job: userJob, }, }, }, @@ -124,7 +124,7 @@ func TestScheduler_Register(t *testing.T) { }, { name: "additional dependency", - scheduler: newSchedulerWithJob(t, userJob, false, depToResolve), + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), dependencies: []int{depToResolve}, wantExecuted: false, wantNumDependencies: 1, @@ -132,12 +132,16 @@ func TestScheduler_Register(t *testing.T) { dependents: map[int][]*job[int]{ depToResolve: { { - dependencies: set.Of(depToResolve), - job: userJob, + numUnresolved: 1, + fulfilled: nil, + abandoned: nil, + job: userJob, }, { - dependencies: set.Of(depToResolve), - job: userJob, + numUnresolved: 1, + fulfilled: nil, + abandoned: nil, + job: userJob, }, }, }, @@ -154,7 +158,8 @@ func TestScheduler_Register(t *testing.T) { require.NoError(test.scheduler.Register(context.Background(), userJob, test.dependencies...)) require.Equal(test.wantNumDependencies, test.scheduler.NumDependencies()) require.Equal(test.wantExecuted, userJob.calledExecute) - require.False(userJob.calledCancel) + require.Empty(userJob.fulfilled) + require.Empty(userJob.abandoned) require.Equal(test.wantScheduler, test.scheduler) }) } @@ -165,37 +170,68 @@ func TestScheduler_Fulfill(t *testing.T) { tests := []struct { name string scheduler *Scheduler[int] - wantExecute bool + wantExecuted bool + wantFulfilled []int + wantAbandoned []int wantScheduler *Scheduler[int] }{ { name: "no jobs", scheduler: NewScheduler[int](), - wantExecute: false, + wantExecuted: false, + wantFulfilled: nil, + wantAbandoned: nil, wantScheduler: NewScheduler[int](), }, { name: "single dependency", - scheduler: newSchedulerWithJob(t, userJob, false, depToResolve), - wantExecute: true, + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), + wantExecuted: true, + wantFulfilled: []int{depToResolve}, + wantAbandoned: nil, wantScheduler: NewScheduler[int](), }, { name: "non-existent dependency", - scheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), - wantExecute: false, - wantScheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), + scheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), + wantExecuted: false, + wantFulfilled: nil, + wantAbandoned: nil, + wantScheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), }, { name: "incomplete dependencies", - scheduler: newSchedulerWithJob(t, userJob, false, depToResolve, depToNeglect), - wantExecute: false, - wantScheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, nil), + wantExecuted: false, + wantFulfilled: nil, + wantAbandoned: nil, + wantScheduler: &Scheduler[int]{ + dependents: map[int][]*job[int]{ + depToNeglect: { + { + numUnresolved: 1, + fulfilled: []int{depToResolve}, + abandoned: nil, + job: userJob, + }, + }, + }, + }, }, { name: "duplicate dependency", - scheduler: newSchedulerWithJob(t, userJob, false, depToResolve, depToResolve), - wantExecute: true, + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToResolve}, nil, nil), + wantExecuted: true, + wantFulfilled: []int{depToResolve, depToResolve}, + wantAbandoned: nil, + wantScheduler: NewScheduler[int](), + }, + { + name: "previously abandoned", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, []int{depToNeglect}), + wantExecuted: true, + wantFulfilled: []int{depToResolve}, + wantAbandoned: []int{depToNeglect}, wantScheduler: NewScheduler[int](), }, } @@ -207,8 +243,9 @@ func TestScheduler_Fulfill(t *testing.T) { userJob.reset() require.NoError(test.scheduler.Fulfill(context.Background(), depToResolve)) - require.Equal(test.wantExecute, userJob.calledExecute) - require.False(userJob.calledCancel) + require.Equal(test.wantExecuted, userJob.calledExecute) + require.Equal(test.wantFulfilled, userJob.fulfilled) + require.Equal(test.wantAbandoned, userJob.abandoned) require.Equal(test.wantScheduler, test.scheduler) }) } @@ -219,37 +256,68 @@ func TestScheduler_Abandon(t *testing.T) { tests := []struct { name string scheduler *Scheduler[int] - wantCancelled bool + wantExecuted bool + wantFulfilled []int + wantAbandoned []int wantScheduler *Scheduler[int] }{ { name: "no jobs", scheduler: NewScheduler[int](), - wantCancelled: false, + wantExecuted: false, + wantFulfilled: nil, + wantAbandoned: nil, wantScheduler: NewScheduler[int](), }, { name: "single dependency", - scheduler: newSchedulerWithJob(t, userJob, false, depToResolve), - wantCancelled: true, + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve}, nil, nil), + wantExecuted: true, + wantFulfilled: nil, + wantAbandoned: []int{depToResolve}, wantScheduler: NewScheduler[int](), }, { name: "non-existent dependency", - scheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), - wantCancelled: false, - wantScheduler: newSchedulerWithJob(t, userJob, false, depToNeglect), + scheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), + wantExecuted: false, + wantFulfilled: nil, + wantAbandoned: nil, + wantScheduler: newSchedulerWithJob(t, userJob, []int{depToNeglect}, nil, nil), }, { name: "incomplete dependencies", - scheduler: newSchedulerWithJob(t, userJob, false, depToResolve, depToNeglect), - wantCancelled: false, - wantScheduler: newSchedulerWithJob(t, userJob, true, depToNeglect), + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, nil, nil), + wantExecuted: false, + wantFulfilled: nil, + wantAbandoned: nil, + wantScheduler: &Scheduler[int]{ + dependents: map[int][]*job[int]{ + depToNeglect: { + { + numUnresolved: 1, + fulfilled: nil, + abandoned: []int{depToResolve}, + job: userJob, + }, + }, + }, + }, }, { name: "duplicate dependency", - scheduler: newSchedulerWithJob(t, userJob, false, depToResolve, depToResolve), - wantCancelled: true, + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToResolve}, nil, nil), + wantExecuted: true, + wantFulfilled: nil, + wantAbandoned: []int{depToResolve, depToResolve}, + wantScheduler: NewScheduler[int](), + }, + { + name: "previously fulfilled", + scheduler: newSchedulerWithJob(t, userJob, []int{depToResolve, depToNeglect}, []int{depToNeglect}, nil), + wantExecuted: true, + wantFulfilled: []int{depToNeglect}, + wantAbandoned: []int{depToResolve}, wantScheduler: NewScheduler[int](), }, } @@ -261,8 +329,9 @@ func TestScheduler_Abandon(t *testing.T) { userJob.reset() require.NoError(test.scheduler.Abandon(context.Background(), depToResolve)) - require.False(userJob.calledExecute) - require.Equal(test.wantCancelled, userJob.calledCancel) + require.Equal(test.wantExecuted, userJob.calledExecute) + require.Equal(test.wantFulfilled, userJob.fulfilled) + require.Equal(test.wantAbandoned, userJob.abandoned) require.Equal(test.wantScheduler, test.scheduler) }) } diff --git a/snow/engine/snowman/voter.go b/snow/engine/snowman/voter.go index 4332e969dad7..c57a1c733551 100644 --- a/snow/engine/snowman/voter.go +++ b/snow/engine/snowman/voter.go @@ -13,7 +13,7 @@ import ( "github.com/ava-labs/avalanchego/utils/bag" ) -var _ job.Job = (*voter)(nil) +var _ job.Job[ids.ID] = (*voter)(nil) // Voter records chits received from [nodeID] once its dependencies are met. type voter struct { @@ -23,7 +23,11 @@ type voter struct { responseOptions []ids.ID } -func (v *voter) Execute(ctx context.Context) error { +// The resolution results from the dependencies of the voter aren't explicitly +// used. The responseOptions are used to determine which block to apply the vote +// to. The dependencies are only used to optimistically delay the application of +// the vote until the blocks have been issued. +func (v *voter) Execute(ctx context.Context, _ []ids.ID, _ []ids.ID) error { var ( vote ids.ID shouldVote bool @@ -74,7 +78,3 @@ func (v *voter) Execute(ctx context.Context) error { v.t.repoll(ctx) return nil } - -func (v *voter) Cancel(ctx context.Context) error { - return v.Execute(ctx) -} From 7f286f0b5c481fa8a01313b13edc90b5ea071c38 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 18 Jun 2024 14:41:14 -0400 Subject: [PATCH 2/3] nit --- snow/engine/snowman/job/scheduler.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/snow/engine/snowman/job/scheduler.go b/snow/engine/snowman/job/scheduler.go index 243856a3bf27..49b6d555869d 100644 --- a/snow/engine/snowman/job/scheduler.go +++ b/snow/engine/snowman/job/scheduler.go @@ -5,9 +5,7 @@ // dependencies. package job -import ( - "context" -) +import "context" // Job is a unit of work that can be executed based on the result of resolving // requested dependencies. From a9123993fa03c06124fb0c7e2f173fdf358cda39 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 18 Jun 2024 17:01:16 -0400 Subject: [PATCH 3/3] whoopse --- snow/engine/snowman/job/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snow/engine/snowman/job/scheduler.go b/snow/engine/snowman/job/scheduler.go index 49b6d555869d..bba746f96629 100644 --- a/snow/engine/snowman/job/scheduler.go +++ b/snow/engine/snowman/job/scheduler.go @@ -44,7 +44,7 @@ func NewScheduler[T comparable]() *Scheduler[T] { // allowed. func (s *Scheduler[T]) Schedule(ctx context.Context, userJob Job[T], dependencies ...T) error { numUnresolved := len(dependencies) - if len(dependencies) == 0 { + if numUnresolved == 0 { return userJob.Execute(ctx, nil, nil) }