From dda2022eddbee55ef9cb342938e6bc9c4fd5ba28 Mon Sep 17 00:00:00 2001 From: Amine Date: Wed, 31 Jul 2024 21:01:45 -0700 Subject: [PATCH] Wait for the `Namespace` and `Account` informers to sync before controller startup (#154) Add a new functionality to help waiting for the `Namespace` and `Account` cache informers to sync before proceeding with the creation of the reconcilers. Key changes: - Publish `informer.HasSynced` to the top level structs for namespace and accout caches - Use these `informer.HasSynced` to wait for the caches to sync using `"k8s.io/client-go/tools/cache"`. - We call the wait mechanism right after the function that spins the informers a.k.a `caches.Run`. Last to note, we are using a `context.TODO()` context, as a temporary workaround until we figure out a better mechanism for context cancellation. By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --- pkg/runtime/cache/account.go | 2 ++ pkg/runtime/cache/cache.go | 11 ++++++++++- pkg/runtime/cache/namespace.go | 4 ++++ pkg/runtime/service_controller.go | 5 +++++ pkg/runtime/service_controller_test.go | 5 ++++- 5 files changed, 25 insertions(+), 2 deletions(-) diff --git a/pkg/runtime/cache/account.go b/pkg/runtime/cache/account.go index 8f275e2..56c95a0 100644 --- a/pkg/runtime/cache/account.go +++ b/pkg/runtime/cache/account.go @@ -50,6 +50,7 @@ type AccountCache struct { log logr.Logger roleARNs map[string]string configMapCreated bool + hasSynced func() bool } // NewAccountCache instanciate a new AccountCache. @@ -111,6 +112,7 @@ func (c *AccountCache) Run(clientSet kubernetes.Interface, stopCh <-chan struct{ }, }) go informer.Run(stopCh) + c.hasSynced = informer.HasSynced } // GetAccountRoleARN queries the AWS accountID associated Role ARN diff --git a/pkg/runtime/cache/cache.go b/pkg/runtime/cache/cache.go index e77fae3..06275cf 100644 --- a/pkg/runtime/cache/cache.go +++ b/pkg/runtime/cache/cache.go @@ -14,11 +14,13 @@ package cache import ( + "context" "time" "github.com/go-logr/logr" "github.com/jaypipes/envutil" kubernetes "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" ) const ( @@ -96,7 +98,14 @@ func (c Caches) Run(clientSet kubernetes.Interface) { if c.Namespaces != nil { c.Namespaces.Run(clientSet, stopCh) } - c.stopCh = stopCh +} + +// WaitForCachesToSync waits for both of the namespace and configMap +// informers to sync - by checking their hasSynced functions. +func (c Caches) WaitForCachesToSync(ctx context.Context) bool { + namespaceSynced := cache.WaitForCacheSync(ctx.Done(), c.Namespaces.hasSynced) + accountSynced := cache.WaitForCacheSync(ctx.Done(), c.Accounts.hasSynced) + return namespaceSynced && accountSynced } // Stop closes the stop channel and cause all the SharedInformers diff --git a/pkg/runtime/cache/namespace.go b/pkg/runtime/cache/namespace.go index d2e2326..ebe04ae 100644 --- a/pkg/runtime/cache/namespace.go +++ b/pkg/runtime/cache/namespace.go @@ -84,6 +84,9 @@ type NamespaceCache struct { watchScope []string // ignored is the list of namespaces we are ignoring ignored []string + // hasSynced is a function that will return true if namespace informer + // has received "at least" once the full list of the namespaces. + hasSynced func() bool } // NewNamespaceCache instanciate a new NamespaceCache. @@ -160,6 +163,7 @@ func (c *NamespaceCache) Run(clientSet kubernetes.Interface, stopCh <-chan struc }, }) go informer.Run(stopCh) + c.hasSynced = informer.HasSynced } // GetDefaultRegion returns the default region if it it exists diff --git a/pkg/runtime/service_controller.go b/pkg/runtime/service_controller.go index e203791..0e8ff55 100644 --- a/pkg/runtime/service_controller.go +++ b/pkg/runtime/service_controller.go @@ -14,6 +14,7 @@ package runtime import ( + "context" "fmt" "strings" "sync" @@ -233,6 +234,10 @@ func (c *serviceController) BindControllerManager(mgr ctrlrt.Manager, cfg ackcfg // Run the caches. This will not block as the caches are run in // separate goroutines. cache.Run(clientSet) + // Wait for the caches to sync + ctx := context.TODO() + synced := cache.WaitForCachesToSync(ctx) + c.log.Info("Waited for the caches to sync", "synced", synced) } if cfg.EnableAdoptedResourceReconciler { diff --git a/pkg/runtime/service_controller_test.go b/pkg/runtime/service_controller_test.go index 16222d5..67958d8 100644 --- a/pkg/runtime/service_controller_test.go +++ b/pkg/runtime/service_controller_test.go @@ -176,7 +176,10 @@ func TestServiceController(t *testing.T) { require.Empty(recons) mgr := &fakeManager{} - cfg := ackcfg.Config{} + cfg := ackcfg.Config{ + // Disable caches, by setting a mono-namespace watch mode + WatchNamespace: "default", + } err := sc.BindControllerManager(mgr, cfg) require.Nil(err)