From 20e1c0d714a90e806d885082922fc3cc8d5358c1 Mon Sep 17 00:00:00 2001 From: Amine Date: Mon, 4 Mar 2024 09:55:07 -0600 Subject: [PATCH] Introduce flags for fine-tuning maximum concurrent reconciles per resource (#141) This commit aims to improve the performance and scalability of the ACK service controllers by introducing support for configuring the maximum number of concurrent reconciles for individual resources. The primary motivation behind this change is to address varying workload demands and resource requirements in Kubernetes environements. This patch introduces two new flags (soon exposed to in the helm chart values): - `--reconcile-default-max-concurrent-syncs`: allow users to specify the default maximum concurrency level for all the resources. - `--reconcile-resource-max-concurrent-syncs`: enable users to define resource-specific maximum concurrency settings, overriding the default value. A use case example would be the scenario of an admin wanting to manage EKS resources using the ACK `eks-controller`. It is normal for each cluster to have multiple nodegroups/PIAs/AccessEntries/FargateProfiles, which can indicate the proportion of needed max concurrencies for each resource. For instance, you can configure the EKS Controller with a default maximum of 2 concurrent reconciles for all the resources, and override the maximum concurrency to 10 for `AccessEntries` and `PodIdentityAssociations` Signed-off-by: Amine Hilaly By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --- pkg/config/config.go | 154 +++++++++++++++++++++++++------------- pkg/config/config_test.go | 4 +- pkg/runtime/reconciler.go | 6 ++ 3 files changed, 110 insertions(+), 54 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 6030b59..5865f0b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -39,24 +39,26 @@ import ( ) const ( - flagEnableLeaderElection = "enable-leader-election" - flagLeaderElectionNamespace = "leader-election-namespace" - flagMetricAddr = "metrics-addr" - flagHealthzAddr = "healthz-addr" - flagEnableDevLogging = "enable-development-logging" - flagAWSRegion = "aws-region" - flagAWSEndpointURL = "aws-endpoint-url" - flagAWSIdentityEndpointURL = "aws-identity-endpoint-url" - flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls" - flagLogLevel = "log-level" - flagResourceTags = "resource-tags" - flagWatchNamespace = "watch-namespace" - flagEnableWebhookServer = "enable-webhook-server" - flagWebhookServerAddr = "webhook-server-addr" - flagDeletionPolicy = "deletion-policy" - flagReconcileDefaultResyncSeconds = "reconcile-default-resync-seconds" - flagReconcileResourceResyncSeconds = "reconcile-resource-resync-seconds" - envVarAWSRegion = "AWS_REGION" + flagEnableLeaderElection = "enable-leader-election" + flagLeaderElectionNamespace = "leader-election-namespace" + flagMetricAddr = "metrics-addr" + flagHealthzAddr = "healthz-addr" + flagEnableDevLogging = "enable-development-logging" + flagAWSRegion = "aws-region" + flagAWSEndpointURL = "aws-endpoint-url" + flagAWSIdentityEndpointURL = "aws-identity-endpoint-url" + flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls" + flagLogLevel = "log-level" + flagResourceTags = "resource-tags" + flagWatchNamespace = "watch-namespace" + flagEnableWebhookServer = "enable-webhook-server" + flagWebhookServerAddr = "webhook-server-addr" + flagDeletionPolicy = "deletion-policy" + flagReconcileDefaultResyncSeconds = "reconcile-default-resync-seconds" + flagReconcileResourceResyncSeconds = "reconcile-resource-resync-seconds" + flagReconcileDefaultMaxConcurrency = "reconcile-default-max-concurrent-syncs" + flagReconcileResourceMaxConcurrency = "reconcile-resource-max-concurrent-syncs" + envVarAWSRegion = "AWS_REGION" ) var ( @@ -74,24 +76,26 @@ var ( // Config contains configuration options for ACK service controllers type Config struct { - MetricsAddr string - HealthzAddr string - EnableLeaderElection bool - LeaderElectionNamespace string - EnableDevelopmentLogging bool - AccountID string - Region string - IdentityEndpointURL string - EndpointURL string - AllowUnsafeEndpointURL bool - LogLevel string - ResourceTags []string - WatchNamespace string - EnableWebhookServer bool - WebhookServerAddr string - DeletionPolicy ackv1alpha1.DeletionPolicy - ReconcileDefaultResyncSeconds int - ReconcileResourceResyncSeconds []string + MetricsAddr string + HealthzAddr string + EnableLeaderElection bool + LeaderElectionNamespace string + EnableDevelopmentLogging bool + AccountID string + Region string + IdentityEndpointURL string + EndpointURL string + AllowUnsafeEndpointURL bool + LogLevel string + ResourceTags []string + WatchNamespace string + EnableWebhookServer bool + WebhookServerAddr string + DeletionPolicy ackv1alpha1.DeletionPolicy + ReconcileDefaultResyncSeconds int + ReconcileResourceResyncSeconds []string + ReconcileDefaultMaxConcurrency int + ReconcileResourceMaxConcurrency []string } // BindFlags defines CLI/runtime configuration options @@ -202,6 +206,19 @@ func (cfg *Config) BindFlags() { " configuration maps resource kinds to drift remediation periods in seconds. If provided, "+ " resource-specific resync periods take precedence over the default period.", ) + flag.IntVar( + &cfg.ReconcileDefaultMaxConcurrency, flagReconcileDefaultMaxConcurrency, + 1, + "The default maximum number of concurrent reconciles for a resource reconciler. This value is used if no "+ + "resource-specific override has been specified. Default is 1.", + ) + flag.StringArrayVar( + &cfg.ReconcileResourceMaxConcurrency, flagReconcileResourceMaxConcurrency, + []string{}, + "A Key/Value list of strings representing the reconcile max concurrency configuration for each resource. This"+ + " configuration maps resource kinds to maximum number of concurrent reconciles. If provided, "+ + " resource-specific max concurrency takes precedence over the default max concurrency.", + ) } // SetupLogger initializes the logger used in the service controller @@ -222,7 +239,6 @@ func (cfg *Config) SetupLogger() { // SetAWSAccountID uses sts GetCallerIdentity API to find AWS AccountId and set // in Config func (cfg *Config) SetAWSAccountID() error { - awsCfg := aws.Config{} if cfg.IdentityEndpointURL != "" { awsCfg.Endpoint = aws.String(cfg.IdentityEndpointURL) @@ -297,6 +313,9 @@ func (cfg *Config) Validate(options ...Option) error { if cfg.ReconcileDefaultResyncSeconds < 0 { return fmt.Errorf("invalid value for flag '%s': resync seconds default must be greater than 0", flagReconcileDefaultResyncSeconds) } + if cfg.ReconcileDefaultMaxConcurrency < 1 { + return fmt.Errorf("invalid value for flag '%s': max concurrency default must be greater than 0", flagReconcileDefaultMaxConcurrency) + } return nil } @@ -309,28 +328,45 @@ func (cfg *Config) checkUnsafeEndpoint(endpoint *url.URL) error { return nil } -// validateReconcileConfigResources validates the --reconcile-resource-resync-seconds flag -// by checking the resource names and their corresponding duration. +// validateReconcileConfigResources validates the --reconcile-resource-resync-seconds and +// --reconcile-resource-max-concurrent-syncs flags. It ensures that the resource names provided +// in the flags are valid and managed by the controller. func (cfg *Config) validateReconcileConfigResources(supportedGVKs []schema.GroupVersionKind) error { validResourceNames := []string{} for _, gvk := range supportedGVKs { validResourceNames = append(validResourceNames, gvk.Kind) } - for _, resourceResyncSecondsFlag := range cfg.ReconcileResourceResyncSeconds { - resourceName, _, err := parseReconcileFlagArgument(resourceResyncSecondsFlag) - if err != nil { - return fmt.Errorf("error parsing flag argument '%v': %v. Expected format: resource=seconds", resourceResyncSecondsFlag, err) + for _, resourceFlagArgument := range cfg.ReconcileResourceResyncSeconds { + if err := validateReconcileConfigResource(validResourceNames, resourceFlagArgument); err != nil { + return fmt.Errorf("invalid value for flag '%s': %v", flagReconcileResourceResyncSeconds, err) } - if !ackutil.InStrings(resourceName, validResourceNames) { - return fmt.Errorf( - "error parsing flag argument '%v': resource '%v' is not managed by this controller. Expected one of %v", - resourceResyncSecondsFlag, resourceName, strings.Join(validResourceNames, ", "), - ) + } + for _, resourceFlagArgument := range cfg.ReconcileResourceMaxConcurrency { + if err := validateReconcileConfigResource(validResourceNames, resourceFlagArgument); err != nil { + return fmt.Errorf("invalid value for flag '%s': %v", flagReconcileResourceMaxConcurrency, err) } } return nil } +// validateReconcileConfigResource validates a single flag argument of any flag that is used to configure +// resource-specific reconcile settings. It ensures that the resource name is valid and managed by the +// controller, and that the value is a positive integer. If the flag argument is not in the expected format +// or has invalid elements, an error is returned. +func validateReconcileConfigResource(validResourceNames []string, resourceFlagArgument string) error { + resourceName, _, err := parseReconcileFlagArgument(resourceFlagArgument) + if err != nil { + return fmt.Errorf("error parsing flag argument '%v': %v. Expected format: string=number", resourceFlagArgument, err) + } + if !ackutil.InStrings(resourceName, validResourceNames) { + return fmt.Errorf( + "error parsing flag argument '%v': resource '%v' is not managed by this controller. Expected one of %v", + resourceFlagArgument, resourceName, strings.Join(validResourceNames, ", "), + ) + } + return nil +} + // ParseReconcileResourceResyncSeconds parses the values of the --reconcile-resource-resync-seconds // flag and returns a map that maps resource names to resync periods. // The flag arguments are expected to have the format "resource=seconds", where "resource" is the @@ -346,6 +382,20 @@ func (cfg *Config) ParseReconcileResourceResyncSeconds() (map[string]time.Durati return resourceResyncPeriods, nil } +// GetReconcileResourceMaxConcurrency returns the maximum number of concurrent reconciles for a +// given resource name. If the resource name is not found in the --reconcile-resource-max-concurrent-syncs +// flag, the function returns the default maximum concurrency value. +func (cfg *Config) GetReconcileResourceMaxConcurrency(resourceName string) int { + for _, resourceMaxConcurrencyFlag := range cfg.ReconcileResourceMaxConcurrency { + // Parse the resource name and max concurrency from the flag argument + name, maxConcurrency, _ := parseReconcileFlagArgument(resourceMaxConcurrencyFlag) + if strings.EqualFold(name, resourceName) { + return maxConcurrency + } + } + return cfg.ReconcileDefaultMaxConcurrency +} + // parseReconcileFlagArgument parses a flag argument of the form "key=value" into // its individual elements. The key must be a non-empty string and the value must be // a non-empty positive integer. If the flag argument is not in the expected format @@ -365,14 +415,14 @@ func parseReconcileFlagArgument(flagArgument string) (string, int, error) { return "", 0, fmt.Errorf("missing value in flag argument") } - resyncSeconds, err := strconv.Atoi(elements[1]) + value, err := strconv.Atoi(elements[1]) if err != nil { return "", 0, fmt.Errorf("invalid value in flag argument: %v", err) } - if resyncSeconds < 0 { - return "", 0, fmt.Errorf("invalid value in flag argument: expected non-negative integer, got %d", resyncSeconds) + if value <= 0 { + return "", 0, fmt.Errorf("invalid value in flag argument: value must be greater than 0") } - return elements[0], resyncSeconds, nil + return elements[0], value, nil } // GetWatchNamespaces returns a slice of namespaces to watch for custom resource events. diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index ef9ddd4..c98a8f5 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -39,8 +39,8 @@ func TestParseReconcileFlagArgument(t *testing.T) { {"=value", "", 0, true, "missing key in flag argument"}, {"key=value1=value2", "", 0, true, "invalid flag argument format: expected key=value"}, {"key=a", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"a\": invalid syntax"}, - {"key=-1", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -1"}, - {"key=-123456", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -123456"}, + {"key=-1", "", 0, true, "invalid value in flag argument: value must be greater than 0"}, + {"key=-123456", "", 0, true, "invalid value in flag argument: value must be greater than 0"}, {"key=1.1", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"1.1\": invalid syntax"}, } for _, test := range tests { diff --git a/pkg/runtime/reconciler.go b/pkg/runtime/reconciler.go index 16c24d6..d5956d3 100644 --- a/pkg/runtime/reconciler.go +++ b/pkg/runtime/reconciler.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" ctrlrt "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + ctrlrtcontroller "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/predicate" ackv1alpha1 "github.com/aws-controllers-k8s/runtime/apis/core/v1alpha1" @@ -99,12 +100,17 @@ func (r *resourceReconciler) BindControllerManager(mgr ctrlrt.Manager) error { r.kc = mgr.GetClient() r.apiReader = mgr.GetAPIReader() rd := r.rmf.ResourceDescriptor() + maxConcurrentReconciles := r.cfg.GetReconcileResourceMaxConcurrency(rd.GroupVersionKind().Kind) return ctrlrt.NewControllerManagedBy( mgr, ).For( rd.EmptyRuntimeObject(), ).WithEventFilter( predicate.GenerationChangedPredicate{}, + ).WithOptions( + ctrlrtcontroller.Options{ + MaxConcurrentReconciles: maxConcurrentReconciles, + }, ).Complete(r) }