From a17ac2af44bfca8c8cbe75fd6396f2d771d87cb7 Mon Sep 17 00:00:00 2001 From: silenceper Date: Tue, 13 Oct 2020 19:36:46 +0800 Subject: [PATCH] support cpu/memory scaler (#1215) Signed-off-by: silenceper --- CHANGELOG.md | 3 +- api/v1alpha1/scaledobject_types.go | 3 +- api/v1alpha1/zz_generated.deepcopy.go | 16 +--- config/crd/bases/keda.sh_scaledobjects.yaml | 61 +----------- controllers/hpa.go | 35 +++---- pkg/scalers/cpu_memory_scaler.go | 101 ++++++++++++++++++++ pkg/scalers/cpu_memory_scaler_test.go | 57 +++++++++++ pkg/scaling/scale_handler.go | 22 ++++- 8 files changed, 201 insertions(+), 97 deletions(-) create mode 100644 pkg/scalers/cpu_memory_scaler.go create mode 100644 pkg/scalers/cpu_memory_scaler_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a206811e89..48cf8f6e2b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,6 @@ - Define KEDA readiness and liveness probes ([#788](https://github.com/kedacore/keda/issues/788)) - KEDA Support for configurable scaling behavior in HPA v2beta2 ([#802](https://github.com/kedacore/keda/issues/802)) - Add External Push scaler ([#820](https://github.com/kedacore/keda/issues/820) | [docs](https://keda.sh/docs/2.0/scalers/external-push/)) -- Add Standard Resource metrics to KEDA ([#874](https://github.com/kedacore/keda/pull/874)) - Managed Identity support for Azure Monitor scaler ([#936](https://github.com/kedacore/keda/issues/936)) - Add support for multiple triggers on ScaledObject ([#476](https://github.com/kedacore/keda/issues/476)) - Add consumer offset reset policy option to Kafka scaler ([#925](https://github.com/kedacore/keda/pull/925)) @@ -31,7 +30,7 @@ - Add support for multiple redis list types in redis list scaler ([#1006](https://github.com/kedacore/keda/pull/1006)) | [docs](https://keda.sh/docs/2.0/scalers/redis-lists/)) - Introduce Azure Log Analytics scaler ([#1061](https://github.com/kedacore/keda/issues/1061)) | [docs](https://keda.sh/docs/2.0/scalers/azure-log-analytics/)) - Add Metrics API Scaler ([#1026](https://github.com/kedacore/keda/pull/1026)) - +- Add cpu/memory Scaler ([#1215](https://github.com/kedacore/keda/pull/1215)) ### Improvements diff --git a/api/v1alpha1/scaledobject_types.go b/api/v1alpha1/scaledobject_types.go index 26737337069..2a0cc35b184 100644 --- a/api/v1alpha1/scaledobject_types.go +++ b/api/v1alpha1/scaledobject_types.go @@ -54,7 +54,6 @@ type AdvancedConfig struct { // HorizontalPodAutoscalerConfig specifies horizontal scale config type HorizontalPodAutoscalerConfig struct { - ResourceMetrics []*autoscalingv2beta2.ResourceMetricSource `json:"resourceMetrics,omitempty"` // +optional Behavior *autoscalingv2beta2.HorizontalPodAutoscalerBehavior `json:"behavior,omitempty"` } @@ -94,6 +93,8 @@ type ScaledObjectStatus struct { // +optional ExternalMetricNames []string `json:"externalMetricNames,omitempty"` // +optional + ResourceMetricNames []string `json:"resourceMetricNames,omitempty"` + // +optional Conditions Conditions `json:"conditions,omitempty"` } diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index e03193ea5f2..3ca8841e234 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -183,17 +183,6 @@ func (in *HashiCorpVault) DeepCopy() *HashiCorpVault { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HorizontalPodAutoscalerConfig) DeepCopyInto(out *HorizontalPodAutoscalerConfig) { *out = *in - if in.ResourceMetrics != nil { - in, out := &in.ResourceMetrics, &out.ResourceMetrics - *out = make([]*v2beta2.ResourceMetricSource, len(*in)) - for i := range *in { - if (*in)[i] != nil { - in, out := &(*in)[i], &(*out)[i] - *out = new(v2beta2.ResourceMetricSource) - (*in).DeepCopyInto(*out) - } - } - } if in.Behavior != nil { in, out := &in.Behavior, &out.Behavior *out = new(v2beta2.HorizontalPodAutoscalerBehavior) @@ -532,6 +521,11 @@ func (in *ScaledObjectStatus) DeepCopyInto(out *ScaledObjectStatus) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.ResourceMetricNames != nil { + in, out := &in.ResourceMetricNames, &out.ResourceMetricNames + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make(Conditions, len(*in)) diff --git a/config/crd/bases/keda.sh_scaledobjects.yaml b/config/crd/bases/keda.sh_scaledobjects.yaml index 26936fe04f0..ab6fe3dc45e 100644 --- a/config/crd/bases/keda.sh_scaledobjects.yaml +++ b/config/crd/bases/keda.sh_scaledobjects.yaml @@ -187,63 +187,6 @@ spec: type: integer type: object type: object - resourceMetrics: - items: - description: ResourceMetricSource indicates how to scale - on a resource metric known to Kubernetes, as specified - in requests and limits, describing each pod in the current - scale target (e.g. CPU or memory). The values will be - averaged together before being compared to the target. Such - metrics are built in to Kubernetes, and have special scaling - options on top of those available to normal per-pod metrics - using the "pods" source. Only one "target" type should - be set. - properties: - name: - description: name is the name of the resource in question. - type: string - target: - description: target specifies the target value for the - given metric - properties: - averageUtilization: - description: averageUtilization is the target value - of the average of the resource metric across all - relevant pods, represented as a percentage of - the requested value of the resource for the pods. - Currently only valid for Resource metric source - type - format: int32 - type: integer - averageValue: - anyOf: - - type: integer - - type: string - description: averageValue is the target value of - the average of the metric across all relevant - pods (as a quantity) - pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ - x-kubernetes-int-or-string: true - type: - description: type represents whether the metric - type is Utilization, Value, or AverageValue - type: string - value: - anyOf: - - type: integer - - type: string - description: value is the target value of the metric - (as a quantity). - pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ - x-kubernetes-int-or-string: true - required: - - type - type: object - required: - - name - - target - type: object - type: array type: object restoreToOriginalReplicaCount: type: boolean @@ -342,6 +285,10 @@ spec: originalReplicaCount: format: int32 type: integer + resourceMetricNames: + items: + type: string + type: array scaleTargetGVKR: description: GroupVersionKindResource provides unified structure for schema.GroupVersionKind and Resource diff --git a/controllers/hpa.go b/controllers/hpa.go index 764957f3f68..26076ccad56 100644 --- a/controllers/hpa.go +++ b/controllers/hpa.go @@ -135,6 +135,7 @@ func (r *ScaledObjectReconciler) updateHPAIfNeeded(logger logr.Logger, scaledObj func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) ([]autoscalingv2beta2.MetricSpec, error) { var scaledObjectMetricSpecs []autoscalingv2beta2.MetricSpec var externalMetricNames []string + var resourceMetricNames []string scalers, err := r.scaleHandler.GetScalers(scaledObject) if err != nil { @@ -142,28 +143,28 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger, return nil, err } - // Handling the Resource metrics through KEDA - if scaledObject.Spec.Advanced != nil && scaledObject.Spec.Advanced.HorizontalPodAutoscalerConfig != nil { - metrics := getResourceMetrics(scaledObject.Spec.Advanced.HorizontalPodAutoscalerConfig.ResourceMetrics) - scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metrics...) - } - for _, scaler := range scalers { metricSpecs := scaler.GetMetricSpecForScaling() - // add the scaledObjectName label. This is how the MetricsAdapter will know which scaledobject a metric is for when the HPA queries it. for _, metricSpec := range metricSpecs { - metricSpec.External.Metric.Selector = &metav1.LabelSelector{MatchLabels: make(map[string]string)} - metricSpec.External.Metric.Selector.MatchLabels["scaledObjectName"] = scaledObject.Name - externalMetricNames = append(externalMetricNames, metricSpec.External.Metric.Name) + if metricSpec.Resource != nil { + resourceMetricNames = append(resourceMetricNames, string(metricSpec.Resource.Name)) + } + if metricSpec.External != nil { + // add the scaledObjectName label. This is how the MetricsAdapter will know which scaledobject a metric is for when the HPA queries it. + metricSpec.External.Metric.Selector = &metav1.LabelSelector{MatchLabels: make(map[string]string)} + metricSpec.External.Metric.Selector.MatchLabels["scaledObjectName"] = scaledObject.Name + externalMetricNames = append(externalMetricNames, metricSpec.External.Metric.Name) + } } scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metricSpecs...) scaler.Close() } - // store External.MetricNames used by scalers defined in the ScaledObject + // store External.MetricNames,Resource.MetricsNames used by scalers defined in the ScaledObject status := scaledObject.Status.DeepCopy() status.ExternalMetricNames = externalMetricNames + status.ResourceMetricNames = resourceMetricNames err = kedacontrollerutil.UpdateScaledObjectStatus(r.Client, logger, scaledObject, status) if err != nil { logger.Error(err, "Error updating scaledObject status with used externalMetricNames") @@ -173,18 +174,6 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger, return scaledObjectMetricSpecs, nil } -func getResourceMetrics(resourceMetrics []*autoscalingv2beta2.ResourceMetricSource) []autoscalingv2beta2.MetricSpec { - metrics := make([]autoscalingv2beta2.MetricSpec, 0, len(resourceMetrics)) - for _, resourceMetric := range resourceMetrics { - metrics = append(metrics, autoscalingv2beta2.MetricSpec{ - Type: "Resource", - Resource: resourceMetric, - }) - } - - return metrics -} - // checkMinK8sVersionforHPABehavior min version (k8s v1.18) for HPA Behavior func (r *ScaledObjectReconciler) checkMinK8sVersionforHPABehavior(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) { if r.kubeVersion.MinorVersion < 18 { diff --git a/pkg/scalers/cpu_memory_scaler.go b/pkg/scalers/cpu_memory_scaler.go new file mode 100644 index 00000000000..b48f199b2d2 --- /dev/null +++ b/pkg/scalers/cpu_memory_scaler.go @@ -0,0 +1,101 @@ +package scalers + +import ( + "context" + "fmt" + "strconv" + + "k8s.io/api/autoscaling/v2beta2" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +type cpuMemoryScaler struct { + metadata *cpuMemoryMetadata + resourceName v1.ResourceName +} + +type cpuMemoryMetadata struct { + Type v2beta2.MetricTargetType + Value *resource.Quantity + AverageValue *resource.Quantity + AverageUtilization *int32 +} + +// NewCPUMemoryScaler creates a new cpuMemoryScaler +func NewCPUMemoryScaler(resourceName v1.ResourceName, config *ScalerConfig) (Scaler, error) { + meta, parseErr := parseResourceMetadata(config) + if parseErr != nil { + return nil, fmt.Errorf("error parsing %s metadata: %s", resourceName, parseErr) + } + + return &cpuMemoryScaler{ + metadata: meta, + resourceName: resourceName, + }, nil +} + +func parseResourceMetadata(config *ScalerConfig) (*cpuMemoryMetadata, error) { + meta := &cpuMemoryMetadata{} + if val, ok := config.TriggerMetadata["type"]; ok && val != "" { + meta.Type = v2beta2.MetricTargetType(val) + } else { + return nil, fmt.Errorf("no type given") + } + + var value string + var ok bool + if value, ok = config.TriggerMetadata["value"]; !ok || value == "" { + return nil, fmt.Errorf("no value given") + } + switch meta.Type { + case v2beta2.ValueMetricType: + valueQuantity := resource.MustParse(value) + meta.Value = &valueQuantity + case v2beta2.AverageValueMetricType: + averageValueQuantity := resource.MustParse(value) + meta.AverageValue = &averageValueQuantity + case v2beta2.UtilizationMetricType: + valueNum, err := strconv.Atoi(value) + if err != nil { + return nil, err + } + utilizationNum := int32(valueNum) + meta.AverageUtilization = &utilizationNum + default: + return nil, fmt.Errorf("unsupport type") + } + return meta, nil +} + +// IsActive always return true for cpu/memory scaler +func (s *cpuMemoryScaler) IsActive(ctx context.Context) (bool, error) { + return true, nil +} + +//Close no need for cpuMemory scaler +func (s *cpuMemoryScaler) Close() error { + return nil +} + +// GetMetricSpecForScaling returns the metric spec for the HPA +func (s *cpuMemoryScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + cpuMemoryMetric := &v2beta2.ResourceMetricSource{ + Name: s.resourceName, + Target: v2beta2.MetricTarget{ + Type: s.metadata.Type, + Value: s.metadata.Value, + AverageUtilization: s.metadata.AverageUtilization, + AverageValue: s.metadata.AverageValue, + }, + } + metricSpec := v2beta2.MetricSpec{Resource: cpuMemoryMetric, Type: v2beta2.ResourceMetricSourceType} + return []v2beta2.MetricSpec{metricSpec} +} + +// GetMetrics no need for cpu/memory scaler +func (s *cpuMemoryScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + return nil, nil +} diff --git a/pkg/scalers/cpu_memory_scaler_test.go b/pkg/scalers/cpu_memory_scaler_test.go new file mode 100644 index 00000000000..e6c32b3d46e --- /dev/null +++ b/pkg/scalers/cpu_memory_scaler_test.go @@ -0,0 +1,57 @@ +package scalers + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/api/autoscaling/v2beta2" + v1 "k8s.io/api/core/v1" +) + +type parseCPUMemoryMetadataTestData struct { + metadata map[string]string + isError bool +} + +// A complete valid metadata example for reference +var validCPUMemoryMetadata = map[string]string{ + "type": "Utilization", + "value": "50", +} + +var testCPUMemoryMetadata = []parseCPUMemoryMetadataTestData{ + {map[string]string{}, true}, + {validCPUMemoryMetadata, false}, + {map[string]string{"type": "Utilization", "value": "50"}, false}, + {map[string]string{"type": "Value", "value": "50"}, false}, + {map[string]string{"type": "AverageValue", "value": "50"}, false}, + {map[string]string{"type": "AverageValue"}, true}, + {map[string]string{"type": "xxx", "value": "50"}, true}, +} + +func TestCPUMemoryParseMetadata(t *testing.T) { + for _, testData := range testCPUMemoryMetadata { + config := &ScalerConfig{ + TriggerMetadata: testData.metadata, + } + _, err := parseResourceMetadata(config) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } +} + +func TestGetMetricSpecForScaling(t *testing.T) { + config := &ScalerConfig{ + TriggerMetadata: validCPUMemoryMetadata, + } + scaler, _ := NewCPUMemoryScaler(v1.ResourceCPU, config) + metricSpec := scaler.GetMetricSpecForScaling() + + assert.Equal(t, metricSpec[0].Type, v2beta2.ResourceMetricSourceType) + assert.Equal(t, metricSpec[0].Resource.Name, v1.ResourceCPU) + assert.Equal(t, metricSpec[0].Resource.Target.Type, v2beta2.UtilizationMetricType) +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index b7a4477cefa..31c908df0fc 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -208,7 +208,12 @@ func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []s continue } else if isTriggerActive { isActive = true - h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", scaler.GetMetricSpecForScaling()[0].External.Metric.Name) + if scaler.GetMetricSpecForScaling()[0].External != nil { + h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", scaler.GetMetricSpecForScaling()[0].External.Metric.Name) + } + if scaler.GetMetricSpecForScaling()[0].Resource != nil { + h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", scaler.GetMetricSpecForScaling()[0].Resource.Name) + } break } } @@ -224,10 +229,15 @@ func (h *scaleHandler) checkScaledJobScalers(ctx context.Context, scalers []scal for _, scaler := range scalers { scalerLogger := h.logger.WithValues("Scaler", scaler) + metricSpecs := scaler.GetMetricSpecForScaling() + //skip cpu/memory resource scaler + if metricSpecs[0].External == nil { + continue + } + isTriggerActive, err := scaler.IsActive(ctx) scalerLogger.Info("Active trigger", "isTriggerActive", isTriggerActive) - metricSpecs := scaler.GetMetricSpecForScaling() targetAverageValue = getTargetAverageValue(metricSpecs) @@ -254,7 +264,9 @@ func (h *scaleHandler) checkScaledJobScalers(ctx context.Context, scalers []scal scalerLogger.Info("Scaler is active") } } - maxValue = min(scaledJob.MaxReplicaCount(), devideWithCeil(queueLength, targetAverageValue)) + if targetAverageValue != 0 { + maxValue = min(scaledJob.MaxReplicaCount(), devideWithCeil(queueLength, targetAverageValue)) + } h.logger.Info("Scaler maxValue", "maxValue", maxValue) return isActive, queueLength, maxValue } @@ -405,6 +417,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal return scalers.NewAzureQueueScaler(config) case "azure-servicebus": return scalers.NewAzureServiceBusScaler(config) + case "cpu": + return scalers.NewCPUMemoryScaler(corev1.ResourceCPU, config) case "cron": return scalers.NewCronScaler(config) case "external": @@ -419,6 +433,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal return scalers.NewKafkaScaler(config) case "liiklus": return scalers.NewLiiklusScaler(config) + case "memory": + return scalers.NewCPUMemoryScaler(corev1.ResourceMemory, config) case "metrics-api": return scalers.NewMetricsAPIScaler(config) case "mysql":