diff --git a/CHANGELOG.md b/CHANGELOG.md index 806c96b7704..ec008b45c69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,7 +53,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### Fixes -- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX)) +- **AWS SQS Scaler**: Respect `scaleOnInFlight` value ([#4276](https://github.com/kedacore/keda/issue/4276)) ### Deprecations diff --git a/pkg/scalers/aws_sqs_queue_scaler.go b/pkg/scalers/aws_sqs_queue_scaler.go index ae5f153ef87..be1c50cb87e 100644 --- a/pkg/scalers/aws_sqs_queue_scaler.go +++ b/pkg/scalers/aws_sqs_queue_scaler.go @@ -23,11 +23,15 @@ const ( defaultScaleOnInFlight = true ) -var awsSqsQueueMetricNames = []string{ +var awsSqsQueueMetricNamesForScalingInFlight = []string{ "ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible", } +var awsSqsQueueMetricNamesForNotScalingInFlight = []string{ + "ApproximateNumberOfMessages", +} + type awsSqsQueueScaler struct { metricType v2.MetricTargetType metadata *awsSqsQueueMetadata @@ -45,6 +49,7 @@ type awsSqsQueueMetadata struct { awsAuthorization awsAuthorizationMetadata scalerIndex int scaleOnInFlight bool + awsSqsQueueMetricNames []string } // NewAwsSqsQueueScaler creates a new awsSqsQueueScaler @@ -104,10 +109,10 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs } } - if !meta.scaleOnInFlight { - awsSqsQueueMetricNames = []string{ - "ApproximateNumberOfMessages", - } + if meta.scaleOnInFlight { + meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForScalingInFlight + } else { + meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForNotScalingInFlight } if val, ok := config.TriggerMetadata["queueURL"]; ok && val != "" { @@ -198,7 +203,7 @@ func (s *awsSqsQueueScaler) GetMetricsAndActivity(ctx context.Context, metricNam // Get SQS Queue Length func (s *awsSqsQueueScaler) getAwsSqsQueueLength() (int64, error) { input := &sqs.GetQueueAttributesInput{ - AttributeNames: aws.StringSlice(awsSqsQueueMetricNames), + AttributeNames: aws.StringSlice(s.metadata.awsSqsQueueMetricNames), QueueUrl: aws.String(s.metadata.queueURL), } @@ -208,7 +213,7 @@ func (s *awsSqsQueueScaler) getAwsSqsQueueLength() (int64, error) { } var approximateNumberOfMessages int64 - for _, awsSqsQueueMetric := range awsSqsQueueMetricNames { + for _, awsSqsQueueMetric := range s.metadata.awsSqsQueueMetricNames { metricValue, err := strconv.ParseInt(*output.Attributes[awsSqsQueueMetric], 10, 32) if err != nil { return -1, err diff --git a/pkg/scalers/aws_sqs_queue_scaler_test.go b/pkg/scalers/aws_sqs_queue_scaler_test.go index 8f00bfeed7c..fc7e4cdd971 100644 --- a/pkg/scalers/aws_sqs_queue_scaler_test.go +++ b/pkg/scalers/aws_sqs_queue_scaler_test.go @@ -307,10 +307,43 @@ var awsSQSMetricIdentifiers = []awsSQSMetricIdentifier{ {&testAWSSQSMetadata[1], 1, "s1-aws-sqs-DeleteArtifactQ"}, } -var awsSQSGetMetricTestData = []*awsSqsQueueMetadata{ - {queueURL: testAWSSQSProperQueueURL}, - {queueURL: testAWSSQSErrorQueueURL}, - {queueURL: testAWSSQSBadDataQueueURL}, +var awsSQSGetMetricTestData = []*parseAWSSQSMetadataTestData{ + {map[string]string{ + "queueURL": testAWSSQSProperQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnInFlight": "false"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "not error with scaleOnInFlight disabled"}, + {map[string]string{ + "queueURL": testAWSSQSProperQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnInFlight": "true"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "not error with scaleOnInFlight enabled"}, + {map[string]string{ + "queueURL": testAWSSQSErrorQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnInFlight": "false"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "error queue"}, + {map[string]string{ + "queueURL": testAWSSQSBadDataQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnInFlight": "true"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "bad data"}, } func TestSQSParseMetadata(t *testing.T) { @@ -343,8 +376,13 @@ func TestAWSSQSGetMetricSpecForScaling(t *testing.T) { } func TestAWSSQSScalerGetMetrics(t *testing.T) { - for _, meta := range awsSQSGetMetricTestData { + for index, testData := range awsSQSGetMetricTestData { + meta, err := parseAwsSqsQueueMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams, ScalerIndex: index}, logr.Discard()) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } scaler := awsSqsQueueScaler{"", meta, &mockSqs{}, logr.Discard()} + value, _, err := scaler.GetMetricsAndActivity(context.Background(), "MetricName") switch meta.queueURL { case testAWSSQSErrorQueueURL: