From 5881edb75e5e5946470f0cf53c3db02172417075 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 28 Nov 2022 17:21:56 +0100 Subject: [PATCH 1/3] feat: sched: Cache worker calls --- lib/lazy/getonce.go | 50 ++++++++++++++++++ storage/sealer/sched.go | 16 +++--- storage/sealer/sched_assigner_common.go | 12 +++-- storage/sealer/sched_resources.go | 22 +------- storage/sealer/sched_worker_cache.go | 69 +++++++++++++++++++++++++ storage/sealer/selector_alloc.go | 6 +-- storage/sealer/selector_existing.go | 6 +-- storage/sealer/selector_move.go | 6 +-- storage/sealer/selector_task.go | 4 +- 9 files changed, 149 insertions(+), 42 deletions(-) create mode 100644 lib/lazy/getonce.go create mode 100644 storage/sealer/sched_worker_cache.go diff --git a/lib/lazy/getonce.go b/lib/lazy/getonce.go new file mode 100644 index 00000000000..7f9d4d664df --- /dev/null +++ b/lib/lazy/getonce.go @@ -0,0 +1,50 @@ +package lazy + +import ( + "context" + "sync" +) + +type Lazy[T any] struct { + Get func() (T, error) + + once sync.Once + + val T + err error +} + +func MakeLazy[T any](get func() (T, error)) *Lazy[T] { + return &Lazy[T]{ + Get: get, + } +} + +func (l *Lazy[T]) Val() (T, error) { + l.once.Do(func() { + l.val, l.err = l.Get() + }) + return l.val, l.err +} + +type LazyCtx[T any] struct { + Get func(context.Context) (T, error) + + once sync.Once + + val T + err error +} + +func MakeLazyCtx[T any](get func(ctx context.Context) (T, error)) *LazyCtx[T] { + return &LazyCtx[T]{ + Get: get, + } +} + +func (l *LazyCtx[T]) Val(ctx context.Context) (T, error) { + l.once.Do(func() { + l.val, l.err = l.Get(ctx) + }) + return l.val, l.err +} diff --git a/storage/sealer/sched.go b/storage/sealer/sched.go index 42a28d2d0ad..1a1b0fdb9b1 100644 --- a/storage/sealer/sched.go +++ b/storage/sealer/sched.go @@ -43,12 +43,18 @@ const mib = 1 << 20 type WorkerAction func(ctx context.Context, w Worker) error +type SchedWorker interface { + TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) + Paths(context.Context) ([]storiface.StoragePath, error) + Utilization() float64 +} + type WorkerSelector interface { // Ok is true if worker is acceptable for performing a task. // If any worker is preferred for a task, other workers won't be considered for that task. - Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (ok, preferred bool, err error) + Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a SchedWorker) (ok, preferred bool, err error) - Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b + Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) // true if a is preferred over b } type Scheduler struct { @@ -81,11 +87,7 @@ type Scheduler struct { type WorkerHandle struct { workerRpc Worker - - tasksCache map[sealtasks.TaskType]struct{} - tasksUpdate time.Time - tasksLk sync.Mutex - + Info storiface.WorkerInfo preparing *ActiveResources // use with WorkerHandle.lk diff --git a/storage/sealer/sched_assigner_common.go b/storage/sealer/sched_assigner_common.go index dda85efdc68..09518848b53 100644 --- a/storage/sealer/sched_assigner_common.go +++ b/storage/sealer/sched_assigner_common.go @@ -2,6 +2,7 @@ package sealer import ( "context" + "github.com/filecoin-project/lotus/storage/sealer/storiface" "math/rand" "sort" "sync" @@ -37,6 +38,11 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) { */ + cachedWorkers := &schedWorkerCache{ + Workers: sh.Workers, + cached: map[storiface.WorkerID]*cachedSchedWorker{}, + } + windowsLen := len(sh.OpenWindows) queueLen := sh.SchedQueue.Len() @@ -81,7 +87,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) { var havePreferred bool for wnd, windowRequest := range sh.OpenWindows { - worker, ok := sh.Workers[windowRequest.Worker] + worker, ok := cachedWorkers.Get(windowRequest.Worker) if !ok { log.Errorf("worker referenced by windowRequest not found (worker: %s)", windowRequest.Worker) // TODO: How to move forward here? @@ -143,8 +149,8 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) { return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint } - wi := sh.Workers[wii] - wj := sh.Workers[wji] + wi, _ := cachedWorkers.Get(wii) + wj, _ := cachedWorkers.Get(wji) rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout) defer cancel() diff --git a/storage/sealer/sched_resources.go b/storage/sealer/sched_resources.go index 88725f6bacc..232e86e524d 100644 --- a/storage/sealer/sched_resources.go +++ b/storage/sealer/sched_resources.go @@ -1,12 +1,9 @@ package sealer import ( - "context" - "sync" - "time" - "github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/storiface" + "sync" ) type ActiveResources struct { @@ -185,20 +182,3 @@ func (wh *WorkerHandle) Utilization() float64 { return u } - -var tasksCacheTimeout = 30 * time.Second - -func (wh *WorkerHandle) TaskTypes(ctx context.Context) (t map[sealtasks.TaskType]struct{}, err error) { - wh.tasksLk.Lock() - defer wh.tasksLk.Unlock() - - if wh.tasksCache == nil || time.Now().Sub(wh.tasksUpdate) > tasksCacheTimeout { - wh.tasksCache, err = wh.workerRpc.TaskTypes(ctx) - if err != nil { - return nil, err - } - wh.tasksUpdate = time.Now() - } - - return wh.tasksCache, nil -} diff --git a/storage/sealer/sched_worker_cache.go b/storage/sealer/sched_worker_cache.go new file mode 100644 index 00000000000..a17bf567464 --- /dev/null +++ b/storage/sealer/sched_worker_cache.go @@ -0,0 +1,69 @@ +package sealer + +import ( + "context" + "sync" + + "github.com/filecoin-project/lotus/lib/lazy" + "github.com/filecoin-project/lotus/storage/sealer/sealtasks" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +// schedWorkerCache caches scheduling-related calls to workers +type schedWorkerCache struct { + Workers map[storiface.WorkerID]*WorkerHandle + + lk sync.Mutex + cached map[storiface.WorkerID]*cachedSchedWorker +} + +func (s *schedWorkerCache) Get(id storiface.WorkerID) (*cachedSchedWorker, bool) { + s.lk.Lock() + defer s.lk.Unlock() + + if _, found := s.cached[id]; !found { + if _, found := s.Workers[id]; !found { + return nil, false + } + + whnd := s.Workers[id] + + s.cached[id] = &cachedSchedWorker{ + tt: lazy.MakeLazyCtx(whnd.workerRpc.TaskTypes), + paths: lazy.MakeLazyCtx(whnd.workerRpc.Paths), + utilization: lazy.MakeLazy(func() (float64, error) { + return whnd.Utilization(), nil + }), + + Enabled: whnd.Enabled, + Info: whnd.Info, + } + } + + return s.cached[id], true +} + +type cachedSchedWorker struct { + tt *lazy.LazyCtx[map[sealtasks.TaskType]struct{}] + paths *lazy.LazyCtx[[]storiface.StoragePath] + utilization *lazy.Lazy[float64] + + Enabled bool + Info storiface.WorkerInfo +} + +func (c *cachedSchedWorker) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) { + return c.tt.Val(ctx) +} + +func (c *cachedSchedWorker) Paths(ctx context.Context) ([]storiface.StoragePath, error) { + return c.paths.Get(ctx) +} + +func (c *cachedSchedWorker) Utilization() float64 { + // can't error + v, _ := c.utilization.Val() + return v +} + +var _ SchedWorker = &cachedSchedWorker{} diff --git a/storage/sealer/selector_alloc.go b/storage/sealer/selector_alloc.go index ce64820f79b..130f74461f5 100644 --- a/storage/sealer/selector_alloc.go +++ b/storage/sealer/selector_alloc.go @@ -26,7 +26,7 @@ func newAllocSelector(index paths.SectorIndex, alloc storiface.SectorFileType, p } } -func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) { +func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) { tasks, err := whnd.TaskTypes(ctx) if err != nil { return false, false, xerrors.Errorf("getting supported worker task types: %w", err) @@ -35,7 +35,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi return false, false, nil } - paths, err := whnd.workerRpc.Paths(ctx) + paths, err := whnd.Paths(ctx) if err != nil { return false, false, xerrors.Errorf("getting worker paths: %w", err) } @@ -71,7 +71,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi return requested == storiface.FTNone, false, nil } -func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) { +func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) { return a.Utilization() < b.Utilization(), nil } diff --git a/storage/sealer/selector_existing.go b/storage/sealer/selector_existing.go index 830213f1eae..c1e082db8a5 100644 --- a/storage/sealer/selector_existing.go +++ b/storage/sealer/selector_existing.go @@ -28,7 +28,7 @@ func newExistingSelector(index paths.SectorIndex, sector abi.SectorID, alloc sto } } -func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) { +func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) { tasks, err := whnd.TaskTypes(ctx) if err != nil { return false, false, xerrors.Errorf("getting supported worker task types: %w", err) @@ -37,7 +37,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt return false, false, nil } - paths, err := whnd.workerRpc.Paths(ctx) + paths, err := whnd.Paths(ctx) if err != nil { return false, false, xerrors.Errorf("getting worker paths: %w", err) } @@ -78,7 +78,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt return requested == storiface.FTNone, false, nil } -func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) { +func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) { return a.Utilization() < b.Utilization(), nil } diff --git a/storage/sealer/selector_move.go b/storage/sealer/selector_move.go index c1f40245636..fde4b3c59cd 100644 --- a/storage/sealer/selector_move.go +++ b/storage/sealer/selector_move.go @@ -30,7 +30,7 @@ func newMoveSelector(index paths.SectorIndex, sector abi.SectorID, alloc storifa } } -func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) { +func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) { tasks, err := whnd.TaskTypes(ctx) if err != nil { return false, false, xerrors.Errorf("getting supported worker task types: %w", err) @@ -39,7 +39,7 @@ func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi. return false, false, nil } - paths, err := whnd.workerRpc.Paths(ctx) + paths, err := whnd.Paths(ctx) if err != nil { return false, false, xerrors.Errorf("getting worker paths: %w", err) } @@ -99,7 +99,7 @@ func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi. return (ok && s.allowRemote) || pref, pref, nil } -func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) { +func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) { return a.Utilization() < b.Utilization(), nil } diff --git a/storage/sealer/selector_task.go b/storage/sealer/selector_task.go index cc12b514e56..805fcbbd03b 100644 --- a/storage/sealer/selector_task.go +++ b/storage/sealer/selector_task.go @@ -19,7 +19,7 @@ func newTaskSelector() *taskSelector { return &taskSelector{} } -func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) { +func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) { tasks, err := whnd.TaskTypes(ctx) if err != nil { return false, false, xerrors.Errorf("getting supported worker task types: %w", err) @@ -29,7 +29,7 @@ func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi. return supported, false, nil } -func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *WorkerHandle) (bool, error) { +func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b SchedWorker) (bool, error) { atasks, err := a.TaskTypes(ctx) if err != nil { return false, xerrors.Errorf("getting supported worker task types: %w", err) From 5a458a60c21d9d3a9e4c6711f58ed4833d115fe8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 28 Nov 2022 18:02:16 +0100 Subject: [PATCH 2/3] fix sched_test --- storage/sealer/sched.go | 2 +- storage/sealer/sched_assigner_common.go | 2 +- storage/sealer/sched_resources.go | 3 ++- storage/sealer/sched_test.go | 28 ++++++++++++++++++++----- 4 files changed, 27 insertions(+), 8 deletions(-) diff --git a/storage/sealer/sched.go b/storage/sealer/sched.go index 1a1b0fdb9b1..c2b7d6a2d67 100644 --- a/storage/sealer/sched.go +++ b/storage/sealer/sched.go @@ -87,7 +87,7 @@ type Scheduler struct { type WorkerHandle struct { workerRpc Worker - + Info storiface.WorkerInfo preparing *ActiveResources // use with WorkerHandle.lk diff --git a/storage/sealer/sched_assigner_common.go b/storage/sealer/sched_assigner_common.go index 09518848b53..3aad91cc357 100644 --- a/storage/sealer/sched_assigner_common.go +++ b/storage/sealer/sched_assigner_common.go @@ -2,7 +2,6 @@ package sealer import ( "context" - "github.com/filecoin-project/lotus/storage/sealer/storiface" "math/rand" "sort" "sync" @@ -10,6 +9,7 @@ import ( "go.opencensus.io/stats" "github.com/filecoin-project/lotus/metrics" + "github.com/filecoin-project/lotus/storage/sealer/storiface" ) type WindowSelector func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int diff --git a/storage/sealer/sched_resources.go b/storage/sealer/sched_resources.go index 232e86e524d..487e294a22c 100644 --- a/storage/sealer/sched_resources.go +++ b/storage/sealer/sched_resources.go @@ -1,9 +1,10 @@ package sealer import ( + "sync" + "github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/storiface" - "sync" ) type ActiveResources struct { diff --git a/storage/sealer/sched_test.go b/storage/sealer/sched_test.go index 22e7a98d78d..812c906e416 100644 --- a/storage/sealer/sched_test.go +++ b/storage/sealer/sched_test.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" prooftypes "github.com/filecoin-project/go-state-types/proof" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer/fsutil" "github.com/filecoin-project/lotus/storage/sealer/sealtasks" @@ -587,18 +588,24 @@ func TestSched(t *testing.T) { type slowishSelector bool -func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, bool, error) { - time.Sleep(200 * time.Microsecond) +func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a SchedWorker) (bool, bool, error) { + _, _ = a.Paths(ctx) + _, _ = a.TaskTypes(ctx) return bool(s), false, nil } -func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) { - time.Sleep(100 * time.Microsecond) +func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) { + _, _ = a.Paths(ctx) return true, nil } var _ WorkerSelector = slowishSelector(true) +type tw struct { + api.Worker + io.Closer +} + func BenchmarkTrySched(b *testing.B) { logging.SetAllLoggers(logging.LevelInfo) defer logging.SetAllLoggers(logging.LevelDebug) @@ -609,14 +616,25 @@ func BenchmarkTrySched(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() + var whnd api.WorkerStruct + whnd.Internal.TaskTypes = func(p0 context.Context) (map[sealtasks.TaskType]struct{}, error) { + time.Sleep(100 * time.Microsecond) + return nil, nil + } + whnd.Internal.Paths = func(p0 context.Context) ([]storiface.StoragePath, error) { + time.Sleep(100 * time.Microsecond) + return nil, nil + } + sched, err := newScheduler(ctx, "") require.NoError(b, err) sched.Workers[storiface.WorkerID{}] = &WorkerHandle{ - workerRpc: nil, + workerRpc: &tw{Worker: &whnd}, Info: storiface.WorkerInfo{ Hostname: "t", Resources: decentWorkerResources, }, + Enabled: true, preparing: NewActiveResources(), active: NewActiveResources(), } From 1597e8590644e761642aecb05a73e7640a79abb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 29 Nov 2022 11:45:31 +0100 Subject: [PATCH 3/3] sched: Address review --- storage/sealer/sched_assigner_common.go | 1 + storage/sealer/sched_test.go | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/storage/sealer/sched_assigner_common.go b/storage/sealer/sched_assigner_common.go index 3aad91cc357..bf92dbf15ba 100644 --- a/storage/sealer/sched_assigner_common.go +++ b/storage/sealer/sched_assigner_common.go @@ -67,6 +67,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) { partDone := metrics.Timer(sh.mctx, metrics.SchedAssignerCandidatesDuration) defer func() { + // call latest value of partDone in case we error out somewhere partDone() }() diff --git a/storage/sealer/sched_test.go b/storage/sealer/sched_test.go index 812c906e416..2eed1ce7389 100644 --- a/storage/sealer/sched_test.go +++ b/storage/sealer/sched_test.go @@ -589,12 +589,16 @@ func TestSched(t *testing.T) { type slowishSelector bool func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a SchedWorker) (bool, bool, error) { + // note: we don't care about output here, just the time those calls take + // (selector Ok/Cmp is called in the scheduler) _, _ = a.Paths(ctx) _, _ = a.TaskTypes(ctx) return bool(s), false, nil } func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) { + // note: we don't care about output here, just the time those calls take + // (selector Ok/Cmp is called in the scheduler) _, _ = a.Paths(ctx) return true, nil }