Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: sched: Cache worker calls #9737

Merged
merged 3 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions lib/lazy/getonce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package lazy

import (
"context"
"sync"
)

type Lazy[T any] struct {
Get func() (T, error)

once sync.Once

val T
err error
}

func MakeLazy[T any](get func() (T, error)) *Lazy[T] {
return &Lazy[T]{
Get: get,
}
}

func (l *Lazy[T]) Val() (T, error) {
l.once.Do(func() {
l.val, l.err = l.Get()
})
return l.val, l.err
}

type LazyCtx[T any] struct {
Get func(context.Context) (T, error)

once sync.Once

val T
err error
}

func MakeLazyCtx[T any](get func(ctx context.Context) (T, error)) *LazyCtx[T] {
return &LazyCtx[T]{
Get: get,
}
}

func (l *LazyCtx[T]) Val(ctx context.Context) (T, error) {
l.once.Do(func() {
l.val, l.err = l.Get(ctx)
})
return l.val, l.err
}
14 changes: 8 additions & 6 deletions storage/sealer/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,18 @@ const mib = 1 << 20

type WorkerAction func(ctx context.Context, w Worker) error

type SchedWorker interface {
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
Paths(context.Context) ([]storiface.StoragePath, error)
Utilization() float64
}

type WorkerSelector interface {
// Ok is true if worker is acceptable for performing a task.
// If any worker is preferred for a task, other workers won't be considered for that task.
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (ok, preferred bool, err error)
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a SchedWorker) (ok, preferred bool, err error)

Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b
Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) // true if a is preferred over b
}

type Scheduler struct {
Expand Down Expand Up @@ -82,10 +88,6 @@ type Scheduler struct {
type WorkerHandle struct {
workerRpc Worker

tasksCache map[sealtasks.TaskType]struct{}
tasksUpdate time.Time
tasksLk sync.Mutex

Info storiface.WorkerInfo

preparing *ActiveResources // use with WorkerHandle.lk
Expand Down
13 changes: 10 additions & 3 deletions storage/sealer/sched_assigner_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.opencensus.io/stats"

"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

type WindowSelector func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int
Expand Down Expand Up @@ -37,6 +38,11 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {

*/

cachedWorkers := &schedWorkerCache{
Workers: sh.Workers,
cached: map[storiface.WorkerID]*cachedSchedWorker{},
}

windowsLen := len(sh.OpenWindows)
queueLen := sh.SchedQueue.Len()

Expand All @@ -61,6 +67,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {

partDone := metrics.Timer(sh.mctx, metrics.SchedAssignerCandidatesDuration)
defer func() {
// call latest value of partDone in case we error out somewhere
partDone()
}()

Expand All @@ -81,7 +88,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
var havePreferred bool

for wnd, windowRequest := range sh.OpenWindows {
worker, ok := sh.Workers[windowRequest.Worker]
worker, ok := cachedWorkers.Get(windowRequest.Worker)
if !ok {
log.Errorf("worker referenced by windowRequest not found (worker: %s)", windowRequest.Worker)
// TODO: How to move forward here?
Expand Down Expand Up @@ -143,8 +150,8 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint
}

wi := sh.Workers[wii]
wj := sh.Workers[wji]
wi, _ := cachedWorkers.Get(wii)
wj, _ := cachedWorkers.Get(wji)

rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout)
defer cancel()
Expand Down
19 changes: 0 additions & 19 deletions storage/sealer/sched_resources.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package sealer

import (
"context"
"sync"
"time"

"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
Expand Down Expand Up @@ -185,20 +183,3 @@ func (wh *WorkerHandle) Utilization() float64 {

return u
}

var tasksCacheTimeout = 30 * time.Second

func (wh *WorkerHandle) TaskTypes(ctx context.Context) (t map[sealtasks.TaskType]struct{}, err error) {
wh.tasksLk.Lock()
defer wh.tasksLk.Unlock()

if wh.tasksCache == nil || time.Now().Sub(wh.tasksUpdate) > tasksCacheTimeout {
wh.tasksCache, err = wh.workerRpc.TaskTypes(ctx)
if err != nil {
return nil, err
}
wh.tasksUpdate = time.Now()
}

return wh.tasksCache, nil
}
32 changes: 27 additions & 5 deletions storage/sealer/sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
prooftypes "github.com/filecoin-project/go-state-types/proof"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
Expand Down Expand Up @@ -587,18 +588,28 @@ func TestSched(t *testing.T) {

type slowishSelector bool

func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, bool, error) {
time.Sleep(200 * time.Microsecond)
func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a SchedWorker) (bool, bool, error) {
// note: we don't care about output here, just the time those calls take
// (selector Ok/Cmp is called in the scheduler)
_, _ = a.Paths(ctx)
_, _ = a.TaskTypes(ctx)
return bool(s), false, nil
}

func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
time.Sleep(100 * time.Microsecond)
func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
// note: we don't care about output here, just the time those calls take
// (selector Ok/Cmp is called in the scheduler)
_, _ = a.Paths(ctx)
return true, nil
}

var _ WorkerSelector = slowishSelector(true)

type tw struct {
api.Worker
io.Closer
}

func BenchmarkTrySched(b *testing.B) {
logging.SetAllLoggers(logging.LevelInfo)
defer logging.SetAllLoggers(logging.LevelDebug)
Expand All @@ -609,14 +620,25 @@ func BenchmarkTrySched(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

var whnd api.WorkerStruct
whnd.Internal.TaskTypes = func(p0 context.Context) (map[sealtasks.TaskType]struct{}, error) {
time.Sleep(100 * time.Microsecond)
return nil, nil
}
whnd.Internal.Paths = func(p0 context.Context) ([]storiface.StoragePath, error) {
time.Sleep(100 * time.Microsecond)
return nil, nil
}

sched, err := newScheduler(ctx, "")
require.NoError(b, err)
sched.Workers[storiface.WorkerID{}] = &WorkerHandle{
workerRpc: nil,
workerRpc: &tw{Worker: &whnd},
Info: storiface.WorkerInfo{
Hostname: "t",
Resources: decentWorkerResources,
},
Enabled: true,
preparing: NewActiveResources(),
active: NewActiveResources(),
}
Expand Down
69 changes: 69 additions & 0 deletions storage/sealer/sched_worker_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package sealer

import (
"context"
"sync"

"github.com/filecoin-project/lotus/lib/lazy"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

// schedWorkerCache caches scheduling-related calls to workers
type schedWorkerCache struct {
Workers map[storiface.WorkerID]*WorkerHandle

lk sync.Mutex
cached map[storiface.WorkerID]*cachedSchedWorker
}

func (s *schedWorkerCache) Get(id storiface.WorkerID) (*cachedSchedWorker, bool) {
s.lk.Lock()
defer s.lk.Unlock()

if _, found := s.cached[id]; !found {
if _, found := s.Workers[id]; !found {
return nil, false
}

whnd := s.Workers[id]

s.cached[id] = &cachedSchedWorker{
tt: lazy.MakeLazyCtx(whnd.workerRpc.TaskTypes),
paths: lazy.MakeLazyCtx(whnd.workerRpc.Paths),
utilization: lazy.MakeLazy(func() (float64, error) {
return whnd.Utilization(), nil
}),

Enabled: whnd.Enabled,
Info: whnd.Info,
}
}

return s.cached[id], true
}

type cachedSchedWorker struct {
tt *lazy.LazyCtx[map[sealtasks.TaskType]struct{}]
paths *lazy.LazyCtx[[]storiface.StoragePath]
utilization *lazy.Lazy[float64]

Enabled bool
Info storiface.WorkerInfo
}

func (c *cachedSchedWorker) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) {
return c.tt.Val(ctx)
}

func (c *cachedSchedWorker) Paths(ctx context.Context) ([]storiface.StoragePath, error) {
return c.paths.Get(ctx)
}

func (c *cachedSchedWorker) Utilization() float64 {
// can't error
v, _ := c.utilization.Val()
return v
}

var _ SchedWorker = &cachedSchedWorker{}
6 changes: 3 additions & 3 deletions storage/sealer/selector_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func newAllocSelector(index paths.SectorIndex, alloc storiface.SectorFileType, p
}
}

func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
Expand All @@ -35,7 +35,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
return false, false, nil
}

paths, err := whnd.workerRpc.Paths(ctx)
paths, err := whnd.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
return requested == storiface.FTNone, false, nil
}

func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}

Expand Down
6 changes: 3 additions & 3 deletions storage/sealer/selector_existing.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newExistingSelector(index paths.SectorIndex, sector abi.SectorID, alloc sto
}
}

func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
Expand All @@ -37,7 +37,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
return false, false, nil
}

paths, err := whnd.workerRpc.Paths(ctx)
paths, err := whnd.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
return requested == storiface.FTNone, false, nil
}

func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}

Expand Down
6 changes: 3 additions & 3 deletions storage/sealer/selector_move.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newMoveSelector(index paths.SectorIndex, sector abi.SectorID, alloc storifa
}
}

func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
Expand All @@ -39,7 +39,7 @@ func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
return false, false, nil
}

paths, err := whnd.workerRpc.Paths(ctx)
paths, err := whnd.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
return (ok && s.allowRemote) || pref, pref, nil
}

func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}

Expand Down
4 changes: 2 additions & 2 deletions storage/sealer/selector_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func newTaskSelector() *taskSelector {
return &taskSelector{}
}

func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
Expand All @@ -29,7 +29,7 @@ func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
return supported, false, nil
}

func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b SchedWorker) (bool, error) {
atasks, err := a.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)
Expand Down