-
Notifications
You must be signed in to change notification settings - Fork 192
Address OOM kills in addons-manager #3004
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,14 +7,12 @@ import ( | |
"context" | ||
"fmt" | ||
"strings" | ||
"time" | ||
|
||
"github.com/go-logr/logr" | ||
"github.com/pkg/errors" | ||
"golang.org/x/text/cases" | ||
"golang.org/x/text/language" | ||
apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
kerrors "k8s.io/apimachinery/pkg/util/errors" | ||
clusterapiv1beta1 "sigs.k8s.io/cluster-api/api/v1beta1" | ||
|
@@ -88,8 +86,8 @@ func (r *PackageInstallStatusReconciler) SetupWithManager(ctx context.Context, m | |
For(&clusterapiv1beta1.Cluster{}). | ||
Watches( | ||
&source.Kind{Type: &kapppkgiv1alpha1.PackageInstall{}}, | ||
handler.EnqueueRequestsFromMapFunc(r.pkgiToCluster), | ||
builder.WithPredicates(r.pkgiIsManagedAndStatusChanged(r.Log)), | ||
handler.EnqueueRequestsFromMapFunc(pkgiToCluster), | ||
builder.WithPredicates(pkgiIsManagedAndStatusChanged(r.Log)), | ||
). | ||
WithOptions(options). | ||
WithEventFilter(predicates.ClusterHasLabel(constants.TKRLabelClassyClusters, r.Log)). | ||
|
@@ -117,8 +115,7 @@ func (r *PackageInstallStatusReconciler) Reconcile(_ context.Context, req reconc | |
log := r.Log.WithValues(constants.ClusterNamespaceLogKey, req.Namespace, constants.ClusterNameLogKey, req.Name) | ||
|
||
var ( | ||
clusterClient client.Client | ||
clusterRole ClusterRole | ||
clusterRole ClusterRole | ||
) | ||
|
||
// get cluster object | ||
|
@@ -153,6 +150,9 @@ func (r *PackageInstallStatusReconciler) Reconcile(_ context.Context, req reconc | |
log.Info("TKR object not found", "name", tkrName) | ||
return ctrl.Result{}, nil | ||
} | ||
if cluster.Status.Phase == string(clusterapiv1beta1.ClusterPhaseDeleting) || cluster.Status.Phase == string(clusterapiv1beta1.ClusterPhaseFailed) { | ||
return ctrl.Result{}, nil | ||
} | ||
|
||
if cluster.Status.Phase != string(clusterapiv1beta1.ClusterPhaseProvisioned) { | ||
log.Info(fmt.Sprintf("cluster %s/%s does not have status phase %s", cluster.Namespace, cluster.Name, clusterapiv1beta1.ClusterPhaseProvisioned)) | ||
|
@@ -165,25 +165,28 @@ func (r *PackageInstallStatusReconciler) Reconcile(_ context.Context, req reconc | |
if isManagementCluster { | ||
// the cluster is a management cluster | ||
clusterRole = clusterRoleManagement | ||
clusterClient = r.Client | ||
if err := r.reconcile(r.Client, cluster, clusterRole, log); err != nil { | ||
return ctrl.Result{}, err | ||
} | ||
|
||
} else { | ||
// the cluster is a remote workload cluster | ||
clusterRole = clusterRoleWorkload | ||
remoteClient, err := r.tracker.GetClient(r.ctx, clusterapiutil.ObjectKey(cluster)) | ||
if err != nil { | ||
return ctrl.Result{RequeueAfter: constants.RequeueAfterDuration}, errors.Wrap(err, "error getting remote cluster's client") | ||
return ctrl.Result{}, errors.Wrap(err, "error getting remote cluster's client") | ||
} | ||
clusterClient = remoteClient | ||
|
||
log.Info("successfully got remoteClient") | ||
// set watch if not already set. If the watch already exists, it doesn't get re-created | ||
if err := r.watchPackageInstalls(cluster, log); err != nil { | ||
if err := watchPackageInstalls(r.ctx, r.controller, r.tracker, cluster, log); err != nil { | ||
return ctrl.Result{}, errors.Wrap(err, "error watching PackageInstalls on target cluster") | ||
} | ||
log.Info("finished setting up remote watch for the cluster") | ||
} | ||
|
||
if err := r.reconcile(clusterClient, cluster, clusterRole, log); err != nil { | ||
return ctrl.Result{}, err | ||
if err := r.reconcile(remoteClient, cluster, clusterRole, log); err != nil { | ||
return ctrl.Result{}, err | ||
} | ||
} | ||
|
||
return ctrl.Result{}, nil | ||
|
@@ -200,17 +203,18 @@ func (r *PackageInstallStatusReconciler) reconcile(clusterClient client.Client, | |
return err | ||
} | ||
|
||
patchHelper, err := clusterapipatchutil.NewHelper(clusterBootstrap, r.Client) | ||
if err != nil { | ||
return err | ||
} | ||
var errorList []error | ||
|
||
defer func() { | ||
log.Info("patching ClusterBootstrapStatus") | ||
patchHelper, err := clusterapipatchutil.NewHelper(clusterBootstrap, r.Client) | ||
if err != nil { | ||
errorList = append(errorList, errors.Wrap(err, "error patching ClusterBootstrapStatus")) | ||
retErr = kerrors.NewAggregate(errorList) | ||
} | ||
if err := patchHelper.Patch(r.ctx, clusterBootstrap); err != nil { | ||
retErr = kerrors.NewAggregate([]error{retErr, errors.Wrap(err, "error patching ClusterBootstrapStatus")}) | ||
errorList = append(errorList, errors.Wrap(err, "error patching ClusterBootstrapStatus")) | ||
retErr = kerrors.NewAggregate(errorList) | ||
} | ||
log.Info("Successfully patched ClusterBootstrapStatus") | ||
}() | ||
|
||
// this shouldn't include kapp ctrl package as it'll get processed separately | ||
|
@@ -224,7 +228,8 @@ func (r *PackageInstallStatusReconciler) reconcile(clusterClient client.Client, | |
if pkg == nil { | ||
continue | ||
} | ||
if err := r.reconcileClusterBootstrapStatus(clusterClient, clusterBootstrap, clusterObjKey, clusterRole, pkg.RefName, r.Config.SystemNamespace, log); err != nil { | ||
if err := r.reconcileClusterBootstrapStatus(clusterClient, clusterBootstrap, clusterObjKey, pkg.RefName, r.Config.SystemNamespace, log); err != nil { | ||
errorList = append(errorList, err) | ||
// in case of error, just log the error and continue with collecting PackageInstallStatus for other packages | ||
// if a condition corresponding to the package is existing in the ClusterBootstrapStatus, we delete it as the corresponding pkgi or package resources do not exist for the package anymore | ||
log.Error(err, fmt.Sprintf("failed to reconcile PackageInstallStatus for package '%s/%s'", r.Config.SystemNamespace, pkg.RefName)) | ||
|
@@ -235,15 +240,16 @@ func (r *PackageInstallStatusReconciler) reconcile(clusterClient client.Client, | |
// kapp ctrl pkgi exists only for the workload cluster. | ||
// it is installed under cluster.Namespace in the management cluster and should be handled separately | ||
if clusterRole == clusterRoleWorkload && clusterBootstrap.Spec.Kapp != nil { | ||
if err := r.reconcileClusterBootstrapStatus(r.Client, clusterBootstrap, clusterObjKey, clusterRole, clusterBootstrap.Spec.Kapp.RefName, cluster.Namespace, log); err != nil { | ||
if err := r.reconcileClusterBootstrapStatus(r.Client, clusterBootstrap, clusterObjKey, clusterBootstrap.Spec.Kapp.RefName, cluster.Namespace, log); err != nil { | ||
errorList = append(errorList, err) | ||
// in case of error, just log the error and proceed with patching the ClusterBootstrapStatus for all packages in a single patch operation | ||
// if a condition corresponding to the package is existing in the ClusterBootstrapStatus, we delete it as the corresponding pkgi or package resources do not exist for the package anymore | ||
log.Error(err, fmt.Sprintf("failed to reconcile PackageInstallStatus for package '%s/%s'", cluster.Namespace, clusterBootstrap.Spec.Kapp.RefName)) | ||
r.removeConditionIfExistsForPkgName(clusterBootstrap, clusterBootstrap.Spec.Kapp.RefName) | ||
} | ||
} | ||
|
||
return nil | ||
return retErr | ||
} | ||
|
||
// reconcileClusterBootstrapStatus reconciles clusterBootstrapStatus by setting conditions corresponding to all core/additional packages. | ||
|
@@ -252,25 +258,16 @@ func (r *PackageInstallStatusReconciler) reconcileClusterBootstrapStatus( | |
clusterClient client.Client, | ||
clusterBootstrap *runtanzuv1alpha3.ClusterBootstrap, | ||
clusterObjKey client.ObjectKey, | ||
clusterRole ClusterRole, | ||
pkgName string, | ||
pkgiNamespace string, | ||
log logr.Logger) error { | ||
|
||
var pkgiName, pkgShortname string | ||
|
||
c := clusterClient | ||
// controller-runtime's cached client (r.Client) is not able to find aggregated api server resources. | ||
// That's why we need to use r.aggregatedAPIResourcesClient for fetching packageMetadata inside the management cluster. | ||
// This include packages belonging to the management cluster and workload clusters' kapp controller package which | ||
// is installed in the workload cluster's namespace in the management cluster. | ||
// For all other packages belonging to the workload cluster, remoteClient should be used for fetching packageMetadata | ||
if clusterRole == clusterRoleManagement || strings.Contains(pkgName, kappCtrlPkgPrefix) { | ||
c = r.aggregatedAPIResourcesClient | ||
} | ||
// Use the mgmt cluster client to get Package resource for both mgmt and workload. The package resource is synced from | ||
// mgmt to workload so it should be there on mgmt cluster. | ||
pkgRefName, pkgVersion, err := util.GetPackageMetadata(r.ctx, r.aggregatedAPIResourcesClient, pkgName, pkgiNamespace) | ||
|
||
// pkgi name is determined from pkg.RefName. That's why we rely on the existence of package metadata here | ||
pkgRefName, pkgVersion, err := util.GetPackageMetadata(r.ctx, c, pkgName, pkgiNamespace) | ||
if pkgRefName == "" || pkgVersion == "" || err != nil { | ||
return errors.Wrapf(err, "unable to fetch Package.Spec.RefName or Package.Spec.Version from Package '%s/%s'", pkgiNamespace, pkgName) | ||
} | ||
|
@@ -297,13 +294,13 @@ func (r *PackageInstallStatusReconciler) reconcileClusterBootstrapStatus( | |
|
||
// we populate 'Message' with Carvel's PackageInstall 'UsefulErrorMessage' field as it contains more detailed information in case of an error | ||
title := cases.Title(language.Und) | ||
// skip adding current timestamp as it frequently triggers downstream controller to go through the CB resource for no reason | ||
condition := clusterapiv1beta1.Condition{ | ||
Type: clusterapiv1beta1.ConditionType(title.String(pkgShortname)) + "-" + | ||
clusterapiv1beta1.ConditionType(pkgiCondition.Type), | ||
Status: pkgiCondition.Status, | ||
Message: util.GetKappUsefulErrorMessage(pkgi.Status.UsefulErrorMessage), | ||
Reason: pkgiCondition.Reason, | ||
LastTransitionTime: metav1.NewTime(time.Now().UTC().Truncate(time.Second)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's sad to see timestamp go, it was adding more insight to the reconciliation status. But I can imagine it can add to the churn a lot as well. |
||
Status: pkgiCondition.Status, | ||
Message: util.GetKappUsefulErrorMessage(pkgi.Status.UsefulErrorMessage), | ||
Reason: pkgiCondition.Reason, | ||
} | ||
|
||
// only add a new condition entry for the PackageInstall in the clusterBootstrapStatus in case it doesn't already exist. | ||
|
@@ -338,43 +335,43 @@ func (r *PackageInstallStatusReconciler) removeConditionIfExistsForPkgName(clust | |
} | ||
|
||
// watchPackageInstalls sets a remote watch on the provided cluster on the Kind resource | ||
func (r *PackageInstallStatusReconciler) watchPackageInstalls(cluster *clusterapiv1beta1.Cluster, log logr.Logger) error { | ||
func watchPackageInstalls(ctx context.Context, watcher remote.Watcher, tracker *remote.ClusterCacheTracker, cluster *clusterapiv1beta1.Cluster, log logr.Logger) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made these functions not receivers of the PackageInstallStatusReconciler because these are passed to Watch in the remote tracker and is contributing to memory leaks. |
||
// If there is no tracker, don't watch remote package installs | ||
if r.tracker == nil { | ||
if tracker == nil { | ||
return nil | ||
} | ||
|
||
return r.tracker.Watch(r.ctx, remote.WatchInput{ | ||
return tracker.Watch(ctx, remote.WatchInput{ | ||
Name: "watchPackageInstallStatus", | ||
Cluster: clusterapiutil.ObjectKey(cluster), | ||
Watcher: r.controller, | ||
Watcher: watcher, | ||
Kind: &kapppkgiv1alpha1.PackageInstall{}, | ||
EventHandler: handler.EnqueueRequestsFromMapFunc(r.pkgiToCluster), | ||
Predicates: []predicate.Predicate{r.pkgiIsManagedAndStatusChanged(log)}, | ||
EventHandler: handler.EnqueueRequestsFromMapFunc(pkgiToCluster), | ||
Predicates: []predicate.Predicate{pkgiIsManagedAndStatusChanged(log)}, | ||
}) | ||
} | ||
|
||
// pkgiIsManagedAndStatusChanged returns a predicate.Predicate that filters pkgi objects which their status has changed | ||
func (r *PackageInstallStatusReconciler) pkgiIsManagedAndStatusChanged(log logr.Logger) predicate.Funcs { | ||
func pkgiIsManagedAndStatusChanged(log logr.Logger) predicate.Funcs { | ||
return predicate.Funcs{ | ||
CreateFunc: func(e event.CreateEvent) bool { | ||
return r.processPkgiIsManagedAndStatusChanged(e.Object, log.WithValues("predicate", "createEvent")) | ||
return processPkgiIsManagedAndStatusChanged(e.Object, log.WithValues("predicate", "createEvent")) | ||
}, | ||
UpdateFunc: func(e event.UpdateEvent) bool { | ||
return r.processPkgiIsManagedAndStatusChanged(e.ObjectNew, log.WithValues("predicate", "updateEvent")) | ||
return processPkgiIsManagedAndStatusChanged(e.ObjectNew, log.WithValues("predicate", "updateEvent")) | ||
}, | ||
DeleteFunc: func(e event.DeleteEvent) bool { | ||
return r.processPkgiIsManagedAndStatusChanged(e.Object, log.WithValues("predicate", "deleteEvent")) | ||
return processPkgiIsManagedAndStatusChanged(e.Object, log.WithValues("predicate", "deleteEvent")) | ||
}, | ||
GenericFunc: func(e event.GenericEvent) bool { | ||
return r.processPkgiIsManagedAndStatusChanged(e.Object, log.WithValues("predicate", "genericEvent")) | ||
return processPkgiIsManagedAndStatusChanged(e.Object, log.WithValues("predicate", "genericEvent")) | ||
}, | ||
} | ||
} | ||
|
||
// processPkgiIsManagedAndStatusChanged returns true if pkgi status should be processed. | ||
// pkgi status can be processed if it is a managed package and exists in the list of packages defined in ClusterBootstrap | ||
func (r *PackageInstallStatusReconciler) processPkgiIsManagedAndStatusChanged(o client.Object, log logr.Logger) bool { | ||
func processPkgiIsManagedAndStatusChanged(o client.Object, log logr.Logger) bool { | ||
var pkgi *kapppkgiv1alpha1.PackageInstall | ||
switch obj := o.(type) { | ||
case *kapppkgiv1alpha1.PackageInstall: | ||
|
@@ -384,49 +381,19 @@ func (r *PackageInstallStatusReconciler) processPkgiIsManagedAndStatusChanged(o | |
return false | ||
} | ||
|
||
clusterObjKey := r.getClusterNamespacedName(pkgi) | ||
clusterObjKey := getClusterNamespacedName(pkgi) | ||
if clusterObjKey == nil { | ||
return false | ||
} | ||
|
||
isManaged, err := r.isPackageManaged(*clusterObjKey, pkgi.Name) | ||
if err != nil { | ||
// just log the error cause it is called from the predicate function with only bool return value | ||
log.Error(err, "failed to determine whether the package is managed or not") | ||
return false | ||
} | ||
|
||
return isManaged | ||
} | ||
|
||
// isPackageManaged checks if the provided PackageInstall is among the list of managed(core/additional) packages | ||
func (r *PackageInstallStatusReconciler) isPackageManaged(clusterObjKey client.ObjectKey, pkgiName string) (bool, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Until we can find another way to address checking if a package is managed, I am removing this because it requires a client and putting a stateful function (one that has a ref to reconciler) is contributing towards memory leaks. |
||
clusterBootstrap := &runtanzuv1alpha3.ClusterBootstrap{} | ||
if err := r.Client.Get(r.ctx, clusterObjKey, clusterBootstrap); err != nil { | ||
return false, errors.Wrapf(err, "error getting ClusterBootstrap resource for cluster '%s/%s'", clusterObjKey.Namespace, clusterObjKey.Name) | ||
} | ||
|
||
packages := append([]*runtanzuv1alpha3.ClusterBootstrapPackage{ | ||
clusterBootstrap.Spec.CNI, | ||
clusterBootstrap.Spec.CPI, | ||
clusterBootstrap.Spec.CSI, | ||
clusterBootstrap.Spec.Kapp, | ||
}, clusterBootstrap.Spec.AdditionalPackages...) | ||
|
||
for _, pkg := range packages { | ||
if pkg == nil { | ||
continue | ||
} | ||
// ensure the name of the PackageInstall matches the name of the managed packages in the ClusterBootstrap resource | ||
if pkgiName == util.GeneratePackageInstallName(clusterObjKey.Name, pkg.RefName) { | ||
return true, nil | ||
} | ||
// check if package install name matches the names that are generated during install | ||
if pkgi.Name == util.GeneratePackageInstallName(clusterObjKey.Name, pkgi.Spec.PackageRef.RefName) { | ||
return true | ||
} | ||
|
||
return false, nil | ||
return false | ||
} | ||
|
||
func (r *PackageInstallStatusReconciler) getClusterNamespacedName(pkgi *kapppkgiv1alpha1.PackageInstall) *client.ObjectKey { | ||
func getClusterNamespacedName(pkgi *kapppkgiv1alpha1.PackageInstall) *client.ObjectKey { | ||
annotations := pkgi.GetAnnotations() | ||
if annotations == nil { | ||
return nil | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,30 +92,31 @@ var _ = Describe("PackageInstallStatus Reconciler", func() { | |
clusterBootstrapGet(client.ObjectKeyFromObject(mngCluster)) | ||
clusterBootstrapGet(client.ObjectKeyFromObject(wlcCluster)) | ||
|
||
By("verifying un-managed packages do not update the 'Status.Conditions' for ClusterBootstrap") | ||
// install unmanaged package into management cluster. Make sure ClusterBootstrap conditions does not get changed from un-managed package | ||
installPackage(mngCluster.Name, "pkg.test.carvel.dev.1.0.0", mngCluster.Namespace) | ||
mngClusterBootstrap := clusterBootstrapGet(client.ObjectKeyFromObject(mngCluster)) | ||
// Antrea is already installed into management cluster's tkg-system namespace | ||
Expect(len(mngClusterBootstrap.Status.Conditions)).Should(Equal(1)) | ||
antreaCondType := "Antrea-" + clusterapiv1beta1.ConditionType(v1alpha1.ReconcileSucceeded) | ||
Expect(mngClusterBootstrap.Status.Conditions[0].Type).Should(Equal(antreaCondType)) | ||
// install unmanaged package into workload cluster. Make sure cluster bootstrap conditions does not get changed fro un-managed package | ||
installPackage(wlcCluster.Name, "pkg.test.carvel.dev.1.0.0", wlcCluster.Namespace) | ||
wlcClusterBootstrap := clusterBootstrapGet(client.ObjectKeyFromObject(wlcCluster)) | ||
Expect(len(wlcClusterBootstrap.Status.Conditions)).Should(Equal(0)) | ||
//By("verifying un-managed packages do not update the 'Status.Conditions' for ClusterBootstrap") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can restore this test after we figure out a better way to check managed packages. |
||
//// install unmanaged package into management cluster. Make sure ClusterBootstrap conditions does not get changed from un-managed package | ||
//installPackage(mngCluster.Name, "pkg.test.carvel.dev.1.0.0", mngCluster.Namespace) | ||
//mngClusterBootstrap := clusterBootstrapGet(client.ObjectKeyFromObject(mngCluster)) | ||
//// Antrea is already installed into management cluster's tkg-system namespace | ||
//Expect(len(mngClusterBootstrap.Status.Conditions)).Should(Equal(1)) | ||
//antreaCondType := "Antrea-" + clusterapiv1beta1.ConditionType(v1alpha1.ReconcileSucceeded) | ||
//Expect(mngClusterBootstrap.Status.Conditions[0].Type).Should(Equal(antreaCondType)) | ||
//// install unmanaged package into workload cluster. Make sure cluster bootstrap conditions does not get changed fro un-managed package | ||
//installPackage(wlcCluster.Name, "pkg.test.carvel.dev.1.0.0", wlcCluster.Namespace) | ||
//wlcClusterBootstrap := clusterBootstrapGet(client.ObjectKeyFromObject(wlcCluster)) | ||
//Expect(len(wlcClusterBootstrap.Status.Conditions)).Should(Equal(0)) | ||
|
||
By("verifying ClusterBootstrap 'Status.Conditions' does not get updated when PackageInstall's summarized condition is Unknown") | ||
// verify for management cluster | ||
updatePkgInstallStatus(mngAntreaObjKey, "") | ||
mngClusterBootstrap = clusterBootstrapGet(client.ObjectKeyFromObject(mngCluster)) | ||
mngClusterBootstrap := clusterBootstrapGet(client.ObjectKeyFromObject(mngCluster)) | ||
// Antrea is already installed into management cluster's tkg-system namespace | ||
Expect(len(mngClusterBootstrap.Status.Conditions)).Should(Equal(1)) | ||
antreaCondType := "Antrea-" + clusterapiv1beta1.ConditionType(v1alpha1.ReconcileSucceeded) | ||
Expect(mngClusterBootstrap.Status.Conditions[0].Type).Should(Equal(antreaCondType)) | ||
// verify for workload cluster | ||
updatePkgInstallStatus(wlcAntreaObjKey, "") | ||
updatePkgInstallStatus(wlcKappObjKey, "") | ||
wlcClusterBootstrap = clusterBootstrapGet(client.ObjectKeyFromObject(wlcCluster)) | ||
wlcClusterBootstrap := clusterBootstrapGet(client.ObjectKeyFromObject(wlcCluster)) | ||
Expect(len(wlcClusterBootstrap.Status.Conditions)).Should(Equal(0)) | ||
|
||
By("verifying ClusterBootstrap 'Status.Conditions' gets updated for managed packages") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was causing lot of unnecessary logs