Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Feb 14, 2025
1 parent 3199e17 commit 603d549
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 33 deletions.
14 changes: 1 addition & 13 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
pods = append(pods, n.reschedulablePods...)
}
pods = append(pods, deletingNodePods...)
scheduler, err := provisioner.NewScheduler(
log.IntoContext(ctx, operatorlogging.NopLogger),
pods,
stateNodes,
// ReservedOfferingModeFallback is used for the following reasons:
// - For consolidation, we're only going to accept a decision if it lowers the cost of the cluster, and if it only
// requires a single additional nodeclaim. It doesn't matter in this scenario if we fallback.
// - For drift, fallback is required to ensure progress. Progress is only ensured with strict if multiple scheduling
// loops are allowed to proceed, but we need to ensure all pods on the drifted node are scheduled within a single
// iteration. This may result in non-ideal instance choices, but the alternative is deadlock.
// See issue TODO for more details.
scheduling.ReservedOfferingModeFallback,
)
scheduler, err := provisioner.NewScheduler(log.IntoContext(ctx, operatorlogging.NopLogger), pods, stateNodes)
if err != nil {
return scheduling.Results{}, fmt.Errorf("creating scheduler, %w", err)
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (p *Provisioner) NewScheduler(
ctx context.Context,
pods []*corev1.Pod,
stateNodes []*state.StateNode,
reservedOfferingMode scheduler.ReservedOfferingMode,
opts ...scheduler.Options,
) (*scheduler.Scheduler, error) {
nodePools, err := nodepoolutils.ListManaged(ctx, p.kubeClient, p.cloudProvider)
if err != nil {
Expand Down Expand Up @@ -266,7 +266,7 @@ func (p *Provisioner) NewScheduler(
if err != nil {
return nil, fmt.Errorf("getting daemon pods, %w", err)
}
return scheduler.NewScheduler(ctx, p.kubeClient, nodePools, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock, reservedOfferingMode), nil
return scheduler.NewScheduler(ctx, p.kubeClient, nodePools, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock, opts...), nil
}

func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
Expand Down Expand Up @@ -304,7 +304,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
return scheduler.Results{}, nil
}
log.FromContext(ctx).V(1).WithValues("pending-pods", len(pendingPods), "deleting-pods", len(deletingNodePods)).Info("computing scheduling decision for provisionable pod(s)")
s, err := p.NewScheduler(ctx, pods, nodes.Active(), scheduler.ReservedOfferingModeStrict)
s, err := p.NewScheduler(ctx, pods, nodes.Active(), scheduler.DisableReservedFallback)
if err != nil {
if errors.Is(err, ErrNodePoolsNotFound) {
log.FromContext(ctx).Info("no nodepools found")
Expand All @@ -317,6 +317,9 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
return scheduler.Results{}, fmt.Errorf("creating scheduler, %w", err)
}
results := s.Solve(ctx, pods).TruncateInstanceTypes(scheduler.MaxInstanceTypes)
if len(results.ReservedOfferingErrors) != 0 {
log.FromContext(ctx).V(1).WithValues("Pods", pretty.Slice(lo.Map(pods, func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5)).Info("deferring scheduling decision for provisionable pod(s) to future simulation due to limited reserved offering capacity")
}
scheduler.UnschedulablePodsCount.Set(float64(len(results.PodErrors)), map[string]string{scheduler.ControllerLabel: injection.GetControllerName(ctx)})
if len(results.NewNodeClaims) > 0 {
log.FromContext(ctx).WithValues("Pods", pretty.Slice(lo.Map(pods, func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5), "duration", time.Since(start)).Info("found provisionable pod(s)")
Expand Down
54 changes: 40 additions & 14 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sort"
"time"

"github.com/awslabs/operatorpkg/option"
"github.com/samber/lo"
"go.uber.org/multierr"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -57,14 +58,24 @@ type ReservedOfferingMode int
// I don't believe there's a solution to this short of the max-flow based instance selection algorithm, which has its own
// drawbacks.
const (
// ReservedOfferingModeStrict indicates that the scheduler should fail to add a pod to a nodeclaim if doing so would
// prevent it from scheduling to reserved capacity, when it would have otherwise.
ReservedOfferingModeStrict ReservedOfferingMode = iota
// ReservedOfferingModeFallbackAlways indicates to the scheduler that the addition of a pod to a nodeclaim which
// results in all potential reserved offerings being filtered out is allowed (e.g. on-demand / spot fallback).
ReservedOfferingModeFallback
ReservedOfferingModeFallback ReservedOfferingMode = iota
// ReservedOfferingModeStrict indicates that the scheduler should fail to add a pod to a nodeclaim if doing so would
// prevent it from scheduling to reserved capacity, when it would have otherwise.
ReservedOfferingModeStrict
)

type options struct {
reservedOfferingMode ReservedOfferingMode
}

type Options = option.Function[options]

var DisableReservedFallback = func(opts *options) {
opts.reservedOfferingMode = ReservedOfferingModeStrict
}

func NewScheduler(
ctx context.Context,
kubeClient client.Client,
Expand All @@ -76,7 +87,7 @@ func NewScheduler(
daemonSetPods []*corev1.Pod,
recorder events.Recorder,
clock clock.Clock,
reservedOfferingMode ReservedOfferingMode,
opts ...Options,
) *Scheduler {

// if any of the nodePools add a taint with a prefer no schedule effect, we add a toleration for the taint
Expand Down Expand Up @@ -115,7 +126,7 @@ func NewScheduler(
}),
clock: clock,
reservationManager: NewReservationManager(instanceTypes),
reservedOfferingMode: reservedOfferingMode,
reservedOfferingMode: option.Resolve(opts...).reservedOfferingMode,
}
s.calculateExistingNodeClaims(stateNodes, daemonSetPods)
return s
Expand Down Expand Up @@ -147,9 +158,10 @@ type Scheduler struct {

// Results contains the results of the scheduling operation
type Results struct {
NewNodeClaims []*NodeClaim
ExistingNodes []*ExistingNode
PodErrors map[*corev1.Pod]error
NewNodeClaims []*NodeClaim
ExistingNodes []*ExistingNode
PodErrors map[*corev1.Pod]error
ReservedOfferingErrors map[*corev1.Pod]error
}

// Record sends eventing and log messages back for the results that were produced from a scheduling run
Expand Down Expand Up @@ -245,6 +257,7 @@ func (r Results) TruncateInstanceTypes(maxInstanceTypes int) Results {
return r
}

//nolint:gocyclo
func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
defer metrics.Measure(DurationSeconds, map[string]string{ControllerLabel: injection.GetControllerName(ctx)})()
// We loop trying to schedule unschedulable pods as long as we are making progress. This solves a few
Expand Down Expand Up @@ -300,12 +313,25 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
m.FinalizeScheduling()
}

// Parition errors into reserved offering and standard errors. Reserved offering errors can only occur when the
// reserved offering mode is set to strict and we failed to schedule a pod due to all reservations being consumed.
// This is different from a standard error since we expect that the pod may be able to schedule in subsequent
// simulations.
reservedOfferingErrors := map[*corev1.Pod]error{}
for pod, err := range errors {
if IsReservedOfferingError(err) {
reservedOfferingErrors[pod] = err
}
}
for pod := range reservedOfferingErrors {
delete(errors, pod)
}

return Results{
NewNodeClaims: s.newNodeClaims,
ExistingNodes: s.existingNodes,
PodErrors: lo.OmitBy(errors, func(_ *corev1.Pod, err error) bool {
return IsReservedOfferingError(err)
}),
NewNodeClaims: s.newNodeClaims,
ExistingNodes: s.existingNodes,
PodErrors: errors,
ReservedOfferingErrors: reservedOfferingErrors,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
nil,
events.NewRecorder(&record.FakeRecorder{}),
clock,
scheduling.ReservedOfferingModeStrict,
scheduling.DisableReservedFallback,
)

b.ResetTimer()
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3668,7 +3668,7 @@ var _ = Context("Scheduling", func() {
},
},
}) // Create 1000 pods which should take long enough to schedule that we should be able to read the queueDepth metric with a value
s, err := prov.NewScheduler(ctx, pods, nil, scheduling.ReservedOfferingModeStrict)
s, err := prov.NewScheduler(ctx, pods, nil, scheduling.DisableReservedFallback)
Expect(err).To(BeNil())

var wg sync.WaitGroup
Expand Down Expand Up @@ -3740,7 +3740,7 @@ var _ = Context("Scheduling", func() {
},
},
}) // Create 1000 pods which should take long enough to schedule that we should be able to read the queueDepth metric with a value
s, err := prov.NewScheduler(ctx, pods, nil, scheduling.ReservedOfferingModeStrict)
s, err := prov.NewScheduler(ctx, pods, nil, scheduling.DisableReservedFallback)
Expect(err).To(BeNil())
s.Solve(injection.WithControllerName(ctx, "provisioner"), pods)

Expand Down

0 comments on commit 603d549

Please # to comment.