Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Switch all internal compute capacity to processing units only #66

Merged
merged 2 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/v1alpha1/spannerautoscaler_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (src *SpannerAutoscaler) ConvertTo(dstRaw conversion.Hub) error {
Max: int(*src.Spec.MaxProcessingUnits),
}
}
scaleConfig.ScaledownStepSize = int(*src.Spec.MaxScaleDownNodes)
scaleConfig.ScaledownStepSize = int(*src.Spec.MaxScaleDownNodes) * 1000
scaleConfig.TargetCPUUtilization = v1beta1.TargetCPUUtilization{
HighPriority: int(*src.Spec.TargetCPUUtilization.HighPriority),
}
Expand Down Expand Up @@ -132,7 +132,7 @@ func (dst *SpannerAutoscaler) ConvertFrom(srcRaw conversion.Hub) error {
dst.Spec.MaxProcessingUnits = pointer.Int32(int32(src.Spec.ScaleConfig.ProcessingUnits.Max))
}

dst.Spec.MaxScaleDownNodes = pointer.Int32(int32(src.Spec.ScaleConfig.ScaledownStepSize))
dst.Spec.MaxScaleDownNodes = pointer.Int32(int32(src.Spec.ScaleConfig.ScaledownStepSize / 1000))
dst.Spec.TargetCPUUtilization = TargetCPUUtilization{
HighPriority: pointer.Int32(int32(src.Spec.ScaleConfig.TargetCPUUtilization.HighPriority)),
}
Expand Down
5 changes: 5 additions & 0 deletions api/v1beta1/spannerautoscaler_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type ScaleConfig struct {

// The maximum number of processing units which can be deleted in one scale-down operation
// +kubebuilder:default=2000
// +kubebuilder:validation:MultipleOf=1000
ScaledownStepSize int `json:"scaledownStepSize,omitempty"`

// The CPU utilization which the autoscaling will try to achieve
Expand All @@ -113,6 +114,10 @@ type ScaleConfigPUs struct {
}

type TargetCPUUtilization struct {
// +kubebuilder:validation:Minimum=0
// +kubebuilder:validation:Maximum=100
// +kubebuilder:validation:ExclusiveMinimum=true
// +kubebuilder:validation:ExclusiveMaximum=true
HighPriority int `json:"highPriority"`
}

Expand Down
2 changes: 1 addition & 1 deletion api/v1beta1/spannerautoscaler_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (r *SpannerAutoscaler) Default() {

// set default ScaledownStepSize
if r.Spec.ScaleConfig.ScaledownStepSize == 0 {
r.Spec.ScaleConfig.ScaledownStepSize = 2
r.Spec.ScaleConfig.ScaledownStepSize = 2000
}

log.V(1).Info("finished setting defaults for spannerautoscaler resource", "name", r.Name, "resource", r)
Expand Down
9 changes: 4 additions & 5 deletions api/v1beta1/spannerautoscaler_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ var _ = Describe("SpannerAutoscaler validation", func() {
},
},
ScaleConfig: ScaleConfig{
ComputeType: ComputeTypeNode,
Nodes: ScaleConfigNodes{
Min: 1,
Max: 10,
ProcessingUnits: ScaleConfigPUs{
Min: 1000,
Max: 10000,
},
ScaledownStepSize: 2,
ScaledownStepSize: 2000,
TargetCPUUtilization: TargetCPUUtilization{
HighPriority: 30,
},
Expand Down
42 changes: 39 additions & 3 deletions config/crd/bases/spanner.mercari.com_spannerautoscalers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,13 @@ spec:
description: SpannerAutoscalerSpec defines the desired state of SpannerAutoscaler
properties:
authentication:
description: Authentication details for the Spanner instance
properties:
iamKeySecret:
description: 'Details of the k8s secret which contains the GCP
service account authentication key (in JSON). Ref: https://cloud.google.com/kubernetes-engine/docs/tutorials/authenticating-to-cloud-platform
This is a pointer because structs with string slices can not
be compared for zero values'
properties:
key:
type: string
Expand All @@ -272,8 +277,11 @@ spec:
- name
type: object
impersonateConfig:
description: This is a pointer because structs with string slices
can not be compared for zero values
description: 'Details of the GCP service account which will be
impersonated, for authentication to GCP. This can used only
on GKE clusters, when workload identity is enabled. Ref: https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity
This is a pointer because structs with string slices can not
be compared for zero values'
properties:
delegates:
items:
Expand All @@ -285,27 +293,43 @@ spec:
- targetServiceAccount
type: object
type:
description: Authentication method to be used for GCP authentication.
If `ImpersonateConfig` as well as `IAMKeySecret` is nil, this
will be set to use ADC be default.
enum:
- gcp-sa-key
- impersonation
- adc
type: string
type: object
scaleConfig:
description: Details of the autoscaling parameters for the Spanner
instance
properties:
computeType:
description: Whether to use `nodes` or `processing-units` for
scaling. This is only used at the time of CustomResource creation.
If compute capacity is provided in `nodes`, then it is automatically
converted to `processing-units` at the time of resource creation,
and internally, only `ProcessingUnits` are used for computations
and scaling.
enum:
- nodes
- processing-units
type: string
nodes:
description: If `nodes` are provided at the time of resource creation,
then they are automatically converted to `processing-units`.
So it is recommended to use only the processing units.
properties:
max:
type: integer
min:
type: integer
type: object
processingUnits:
description: 'ProcessingUnits for scaling of the Spanner instance:
https://cloud.google.com/spanner/docs/compute-capacity#compute_capacity'
properties:
max:
multipleOf: 100
Expand All @@ -318,11 +342,20 @@ spec:
- min
type: object
scaledownStepSize:
default: 2
default: 2000
description: The maximum number of processing units which can
be deleted in one scale-down operation
multipleOf: 1000
type: integer
targetCPUUtilization:
description: The CPU utilization which the autoscaling will try
to achieve
properties:
highPriority:
exclusiveMaximum: true
exclusiveMinimum: true
maximum: 100
minimum: 0
type: integer
required:
- highPriority
Expand All @@ -331,10 +364,13 @@ spec:
- targetCPUUtilization
type: object
targetInstance:
description: The Spanner instance which will be managed for autoscaling
properties:
instanceId:
description: The instance id of the Spanner instance
type: string
projectId:
description: The GCP Project id of the Spanner instance
type: string
required:
- instanceId
Expand Down
135 changes: 44 additions & 91 deletions controllers/spannerautoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ var (
errInvalidExclusiveCredentials = errors.New("impersonateConfig and iamKeySecret are mutually exclusive")
)

// TODO: move this to 'defaulting' webhook
const defaultScaledownStepSize = 2

// SpannerAutoscalerReconciler reconciles a SpannerAutoscaler object.
type SpannerAutoscalerReconciler struct {
ctrlClient ctrlclient.Client
Expand Down Expand Up @@ -159,11 +156,6 @@ func (r *SpannerAutoscalerReconciler) Reconcile(ctx context.Context, req ctrlrec
return ctrlreconcile.Result{}, nil
}

// TODO: move this to the defaulting webhook
if sa.Spec.ScaleConfig.ScaledownStepSize == 0 {
sa.Spec.ScaleConfig.ScaledownStepSize = defaultScaledownStepSize
}

log.V(1).Info("resource status", "spannerautoscaler", sa)

credentials, err := r.fetchCredentials(ctx, &sa)
Expand Down Expand Up @@ -204,20 +196,17 @@ func (r *SpannerAutoscalerReconciler) Reconcile(ctx context.Context, req ctrlrec
return ctrlreconcile.Result{}, nil
}

log.V(1).Info("checking to see if we need to calculate processing units", "sa", sa)
if !r.needCalcProcessingUnits(&sa) {
if sa.Status.InstanceState != spanner.StateReady {
log.Info("instance state is not ready")
return ctrlreconcile.Result{}, nil
}

if sa.Status.CurrentProcessingUnits == 0 {
log.Info("current processing units have not fetched yet")
return ctrlreconcile.Result{}, nil
}

// TODO: change this to pass the object instead of so many parameters
desiredProcessingUnits := calcDesiredProcessingUnits(
sa.Status.CurrentHighPriorityCPUUtilization,
normalizeProcessingUnitsOrNodes(sa.Status.CurrentProcessingUnits, sa.Status.CurrentNodes, sa.Spec.ScaleConfig.ComputeType),
sa.Spec.ScaleConfig.TargetCPUUtilization.HighPriority,
normalizeProcessingUnitsOrNodes(sa.Spec.ScaleConfig.ProcessingUnits.Min, sa.Spec.ScaleConfig.Nodes.Min, sa.Spec.ScaleConfig.ComputeType),
normalizeProcessingUnitsOrNodes(sa.Spec.ScaleConfig.ProcessingUnits.Max, sa.Spec.ScaleConfig.Nodes.Max, sa.Spec.ScaleConfig.ComputeType),
sa.Spec.ScaleConfig.ScaledownStepSize,
)
desiredProcessingUnits := calcDesiredProcessingUnits(sa)

now := r.clock.Now()
log.V(1).Info("processing units need to be changed", "desiredProcessingUnits", desiredProcessingUnits, "sa.Status", sa.Status)
Expand All @@ -233,9 +222,9 @@ func (r *SpannerAutoscalerReconciler) Reconcile(ctx context.Context, req ctrlrec
}

r.recorder.Eventf(&sa, corev1.EventTypeNormal, "Updated", "Updated processing units of %s/%s from %d to %d", sa.Spec.TargetInstance.ProjectID, sa.Spec.TargetInstance.InstanceID,
normalizeProcessingUnitsOrNodes(sa.Status.CurrentProcessingUnits, sa.Status.CurrentNodes, sa.Spec.ScaleConfig.ComputeType), desiredProcessingUnits)
sa.Status.CurrentProcessingUnits, desiredProcessingUnits)

log.Info("updated nodes via google cloud api", "before", normalizeProcessingUnitsOrNodes(sa.Status.CurrentProcessingUnits, sa.Status.CurrentNodes, sa.Spec.ScaleConfig.ComputeType), "after", desiredProcessingUnits)
log.Info("updated processing units via google cloud api", "before", sa.Status.CurrentProcessingUnits, "after", desiredProcessingUnits)

saCopy := sa.DeepCopy()
saCopy.Status.DesiredProcessingUnits = desiredProcessingUnits
Expand All @@ -251,18 +240,6 @@ func (r *SpannerAutoscalerReconciler) Reconcile(ctx context.Context, req ctrlrec
return ctrlreconcile.Result{}, nil
}

// TODO: convert all internal computations to processing units only
func normalizeProcessingUnitsOrNodes(pu, nodes int, computeType spannerv1beta1.ComputeType) int {
switch computeType {
case spannerv1beta1.ComputeTypePU:
return pu
case spannerv1beta1.ComputeTypeNode:
return nodes * 1000
default:
return -1
}
}

// SetupWithManager sets up the controller with ctrlmanager.Manager.
func (r *SpannerAutoscalerReconciler) SetupWithManager(mgr ctrlmanager.Manager) error {
opts := ctrlcontroller.Options{
Expand Down Expand Up @@ -293,43 +270,26 @@ func (r *SpannerAutoscalerReconciler) startSyncer(ctx context.Context, nn types.
return nil
}

func (r *SpannerAutoscalerReconciler) needCalcProcessingUnits(sa *spannerv1beta1.SpannerAutoscaler) bool {
log := r.log

switch {
// TODO: Fix this to use only processing units
case sa.Status.CurrentProcessingUnits == 0 && sa.Status.CurrentNodes == 0:
log.Info("current processing units have not fetched yet")
return false

case sa.Status.InstanceState != spanner.StateReady:
log.Info("instance state is not ready")
return false
default:
return true
}
}

func (r *SpannerAutoscalerReconciler) needUpdateProcessingUnits(sa *spannerv1beta1.SpannerAutoscaler, desiredProcessingUnits int, now time.Time) bool {
log := r.log

currentProcessingUnits := normalizeProcessingUnitsOrNodes(sa.Status.CurrentProcessingUnits, sa.Status.CurrentNodes, sa.Spec.ScaleConfig.ComputeType)
currentProcessingUnits := sa.Status.CurrentProcessingUnits

switch {
case desiredProcessingUnits == currentProcessingUnits:
log.V(0).Info("the desired number of processing units is equal to that of the current; no need to scale")
log.Info("the desired number of processing units is equal to that of the current; no need to scale")
return false

case desiredProcessingUnits > currentProcessingUnits && r.clock.Now().Before(sa.Status.LastScaleTime.Time.Add(10*time.Second)):
log.Info("too short to scale up since instance scaled last",
log.Info("too short to scale up since last scale-up event",
"now", now.String(),
"last scale time", sa.Status.LastScaleTime,
)

return false

case desiredProcessingUnits < currentProcessingUnits && r.clock.Now().Before(sa.Status.LastScaleTime.Time.Add(r.scaleDownInterval)):
log.Info("too short to scale down since instance scaled nodes last",
log.Info("too short to scale down since last scale-up event",
"now", now.String(),
"last scale time", sa.Status.LastScaleTime,
)
Expand All @@ -341,49 +301,42 @@ func (r *SpannerAutoscalerReconciler) needUpdateProcessingUnits(sa *spannerv1bet
}
}

// For testing purpose only
func calcDesiredNodes(currentCPU, currentNodes, targetCPU, minNodes, maxNodes, scaledownStepSize int) int {
return calcDesiredProcessingUnits(currentCPU, currentNodes*1000, targetCPU, minNodes*1000, maxNodes*1000, scaledownStepSize) / 1000
}

// nextValidProcessingUnits finds next valid value in processing units.
// https://cloud.google.com/spanner/docs/compute-capacity?hl=en
// Valid values are
// If processingUnits < 1000, processing units must be multiples of 100.
// If processingUnits >= 1000, processing units must be multiples of 1000.
func nextValidProcessingUnits(processingUnits int) int {
if processingUnits < 1000 {
return ((processingUnits / 100) + 1) * 100
// calcDesiredProcessingUnits calculates the values needed to keep CPU utilization below TargetCPU.
func calcDesiredProcessingUnits(sa spannerv1beta1.SpannerAutoscaler) int {
totalCPU := sa.Status.CurrentHighPriorityCPUUtilization * sa.Status.CurrentProcessingUnits

requiredPU := totalCPU / sa.Spec.ScaleConfig.TargetCPUUtilization.HighPriority

var desiredPU int

// https://cloud.google.com/spanner/docs/compute-capacity?hl=en
// Valid values for processing units are:
// If processingUnits < 1000, processing units must be multiples of 100.
// If processingUnits >= 1000, processing units must be multiples of 1000.
//
// Round up the requiredPU value to make it valid
// If it is already a valid PU, increment to next unit to keep CPU usage below desired threshold
if requiredPU < 1000 {
desiredPU = ((requiredPU / 100) + 1) * 100
} else {
desiredPU = ((requiredPU / 1000) + 1) * 1000
}
return ((processingUnits / 1000) + 1) * 1000
}

func maxInt(first int, rest ...int) int {
result := first
for _, v := range rest {
if result < v {
result = v
}
// in case of scaling down, check that we don't scale down beyond the ScaledownStepSize
if scaledDownPU := (sa.Status.CurrentProcessingUnits - sa.Spec.ScaleConfig.ScaledownStepSize); desiredPU < scaledDownPU {
desiredPU = scaledDownPU
}
return result
}

// calcDesiredProcessingUnits calculates the values needed to keep CPU utilization below TargetCPU.
func calcDesiredProcessingUnits(currentCPU, currentProcessingUnits, targetCPU, minProcessingUnits, maxProcessingUnits, scaledownStepSize int) int {
totalCPUProduct1000 := currentCPU * currentProcessingUnits

desiredProcessingUnits := maxInt(nextValidProcessingUnits(totalCPUProduct1000/targetCPU), currentProcessingUnits-scaledownStepSize*1000)

switch {
case desiredProcessingUnits < minProcessingUnits:
return minProcessingUnits

case desiredProcessingUnits > maxProcessingUnits:
return maxProcessingUnits
// keep the scaling between the specified min/max range
if desiredPU < sa.Spec.ScaleConfig.ProcessingUnits.Min {
desiredPU = sa.Spec.ScaleConfig.ProcessingUnits.Min
}

default:
return desiredProcessingUnits
if desiredPU > sa.Spec.ScaleConfig.ProcessingUnits.Max {
desiredPU = sa.Spec.ScaleConfig.ProcessingUnits.Max
}

return desiredPU
}

func (r *SpannerAutoscalerReconciler) fetchCredentials(ctx context.Context, sa *spannerv1beta1.SpannerAutoscaler) (*syncer.Credentials, error) {
Expand Down
Loading