From 21bf215744e8dba5a69718c24a3bdd1923283e7a Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Mon, 6 Dec 2021 15:09:04 +0100 Subject: [PATCH 1/4] chore: Parallel Kamelets install --- go.mod | 1 + pkg/install/kamelets.go | 106 +++++++++++++++++++++++----------------- 2 files changed, 63 insertions(+), 44 deletions(-) diff --git a/go.mod b/go.mod index 34497e8339..e11b03b66a 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( go.uber.org/multierr v1.6.0 go.uber.org/zap v1.19.1 golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c gopkg.in/inf.v0 v0.9.1 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.21.4 diff --git a/pkg/install/kamelets.go b/pkg/install/kamelets.go index 91bebee673..b2ad752751 100644 --- a/pkg/install/kamelets.go +++ b/pkg/install/kamelets.go @@ -1,12 +1,12 @@ /* Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with +contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at +the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 +http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -20,19 +20,22 @@ package install import ( "context" "fmt" - "io/ioutil" + "io/fs" "os" "path" + "path/filepath" "strings" - "github.com/apache/camel-k/pkg/util" + "golang.org/x/sync/errgroup" - "github.com/pkg/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "github.com/pkg/errors" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" "github.com/apache/camel-k/pkg/client" + "github.com/apache/camel-k/pkg/util" "github.com/apache/camel-k/pkg/util/defaults" "github.com/apache/camel-k/pkg/util/kubernetes" ) @@ -42,7 +45,7 @@ const ( defaultKameletDir = "/kamelets/" ) -// KameletCatalog installs the bundled KameletCatalog into one namespace. +// KameletCatalog installs the bundled Kamelets into the specified namespace. func KameletCatalog(ctx context.Context, c client.Client, namespace string) error { kameletDir := os.Getenv(kameletDirEnv) if kameletDir == "" { @@ -58,57 +61,72 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro return fmt.Errorf("kamelet directory %q is a file", kameletDir) } - files, err := ioutil.ReadDir(kameletDir) + g, gCtx := errgroup.WithContext(ctx) + + err = filepath.WalkDir(kameletDir, func(p string, f fs.DirEntry, err error) error { + if err != nil { + return err + } + if f.IsDir() && f.Name() != d.Name() { + return fs.SkipDir + } + if !(strings.HasSuffix(f.Name(), ".yaml") || strings.HasSuffix(f.Name(), ".yml")) { + return nil + } + // We may want to throttle the creation of Go routines if the number of bundled Kamelets increases. + g.Go(func() error { + return createOrReplaceKamelet(gCtx, c, path.Join(kameletDir, f.Name()), namespace) + }) + return nil + }) if err != nil { return err } - for _, file := range files { - if file.IsDir() || !(strings.HasSuffix(file.Name(), ".yaml") || strings.HasSuffix(file.Name(), ".yml")) { - continue - } + return g.Wait() +} - content, err := util.ReadFile(path.Join(kameletDir, file.Name())) - if err != nil { - return err - } +func createOrReplaceKamelet(ctx context.Context, c client.Client, path string, namespace string) error { + fmt.Printf("Install file: %s in %s", path, namespace) + + content, err := util.ReadFile(path) + if err != nil { + return err + } - obj, err := kubernetes.LoadResourceFromYaml(c.GetScheme(), string(content)) + obj, err := kubernetes.LoadResourceFromYaml(c.GetScheme(), string(content)) + if err != nil { + return err + } + if k, ok := obj.(*v1alpha1.Kamelet); ok { + existing := &v1alpha1.Kamelet{} + err = c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: k.Name}, existing) if err != nil { - return err + if k8serrors.IsNotFound(err) { + existing = nil + } else { + return err + } } - if k, ok := obj.(*v1alpha1.Kamelet); ok { - existing := &v1alpha1.Kamelet{} - err = c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: k.Name}, existing) - if err != nil { - if k8serrors.IsNotFound(err) { - existing = nil - } else { - return err - } + + if existing == nil || existing.Labels[v1alpha1.KameletBundledLabel] == "true" { + if k.GetAnnotations() == nil { + k.SetAnnotations(make(map[string]string)) } + k.GetAnnotations()[kamelVersionAnnotation] = defaults.Version - if existing == nil || existing.Labels[v1alpha1.KameletBundledLabel] == "true" { - if k.GetAnnotations() == nil { - k.SetAnnotations(make(map[string]string)) - } - k.GetAnnotations()[kamelVersionAnnotation] = defaults.Version - - if k.GetLabels() == nil { - k.SetLabels(make(map[string]string)) - } - k.GetLabels()[v1alpha1.KameletBundledLabel] = "true" - k.GetLabels()[v1alpha1.KameletReadOnlyLabel] = "true" - - err := ObjectOrCollect(ctx, c, namespace, nil, true, k) - if err != nil { - return errors.Wrapf(err, "could not create resource from file %q", path.Join(kameletDir, file.Name())) - } + if k.GetLabels() == nil { + k.SetLabels(make(map[string]string)) } + k.GetLabels()[v1alpha1.KameletBundledLabel] = "true" + k.GetLabels()[v1alpha1.KameletReadOnlyLabel] = "true" + err := ObjectOrCollect(ctx, c, namespace, nil, true, k) + if err != nil { + return errors.Wrapf(err, "could not create resource from file %q", path) + } } } - return nil } From 237d11ac298d8aced6f09fb8faac992476a6be63 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Mon, 6 Dec 2021 15:39:52 +0100 Subject: [PATCH 2/4] chore: Use Server-Side Apply to install bundled Kamelets --- pkg/install/kamelets.go | 117 +++++++++++++++++++++++++++++----------- 1 file changed, 86 insertions(+), 31 deletions(-) diff --git a/pkg/install/kamelets.go b/pkg/install/kamelets.go index b2ad752751..e0f353e109 100644 --- a/pkg/install/kamelets.go +++ b/pkg/install/kamelets.go @@ -19,8 +19,10 @@ package install import ( "context" + "errors" "fmt" "io/fs" + "net/http" "os" "path" "path/filepath" @@ -29,15 +31,18 @@ import ( "golang.org/x/sync/errgroup" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "github.com/pkg/errors" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" "github.com/apache/camel-k/pkg/client" "github.com/apache/camel-k/pkg/util" "github.com/apache/camel-k/pkg/util/defaults" "github.com/apache/camel-k/pkg/util/kubernetes" + "github.com/apache/camel-k/pkg/util/patch" ) const ( @@ -45,6 +50,8 @@ const ( defaultKameletDir = "/kamelets/" ) +var hasServerSideApply = true + // KameletCatalog installs the bundled Kamelets into the specified namespace. func KameletCatalog(ctx context.Context, c client.Client, namespace string) error { kameletDir := os.Getenv(kameletDirEnv) @@ -75,7 +82,7 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro } // We may want to throttle the creation of Go routines if the number of bundled Kamelets increases. g.Go(func() error { - return createOrReplaceKamelet(gCtx, c, path.Join(kameletDir, f.Name()), namespace) + return applyKamelet(gCtx, c, path.Join(kameletDir, f.Name()), namespace) }) return nil }) @@ -86,9 +93,7 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro return g.Wait() } -func createOrReplaceKamelet(ctx context.Context, c client.Client, path string, namespace string) error { - fmt.Printf("Install file: %s in %s", path, namespace) - +func applyKamelet(ctx context.Context, c client.Client, path string, namespace string) error { content, err := util.ReadFile(path) if err != nil { return err @@ -98,38 +103,88 @@ func createOrReplaceKamelet(ctx context.Context, c client.Client, path string, n if err != nil { return err } - if k, ok := obj.(*v1alpha1.Kamelet); ok { - existing := &v1alpha1.Kamelet{} - err = c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: k.Name}, existing) - if err != nil { - if k8serrors.IsNotFound(err) { - existing = nil - } else { - return err - } - } + kamelet, ok := obj.(*v1alpha1.Kamelet) + if !ok { + return fmt.Errorf("cannot load Kamelet from file %q", path) + } + + kamelet.Namespace = namespace + + if kamelet.GetAnnotations() == nil { + kamelet.SetAnnotations(make(map[string]string)) + } + kamelet.GetAnnotations()[kamelVersionAnnotation] = defaults.Version - if existing == nil || existing.Labels[v1alpha1.KameletBundledLabel] == "true" { - if k.GetAnnotations() == nil { - k.SetAnnotations(make(map[string]string)) - } - k.GetAnnotations()[kamelVersionAnnotation] = defaults.Version - - if k.GetLabels() == nil { - k.SetLabels(make(map[string]string)) - } - k.GetLabels()[v1alpha1.KameletBundledLabel] = "true" - k.GetLabels()[v1alpha1.KameletReadOnlyLabel] = "true" - - err := ObjectOrCollect(ctx, c, namespace, nil, true, k) - if err != nil { - return errors.Wrapf(err, "could not create resource from file %q", path) - } + if kamelet.GetLabels() == nil { + kamelet.SetLabels(make(map[string]string)) + } + kamelet.GetLabels()[v1alpha1.KameletBundledLabel] = "true" + kamelet.GetLabels()[v1alpha1.KameletReadOnlyLabel] = "true" + + if hasServerSideApply { + err := serverSideApply(ctx, c, kamelet) + switch { + case err == nil: + break + case isIncompatibleServerError(err): + hasServerSideApply = false + default: + return fmt.Errorf("could not apply Kamelet from file %q: %w", path, err) } + } else { + return clientSideApply(ctx, c, kamelet) } + return nil } +func serverSideApply(ctx context.Context, c client.Client, resource runtime.Object) error { + target, err := patch.PositiveApplyPatch(resource) + if err != nil { + return err + } + return c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator")) +} + +func clientSideApply(ctx context.Context, c client.Client, resource ctrl.Object) error { + err := c.Create(ctx, resource) + if err == nil { + return nil + } else if !k8serrors.IsAlreadyExists(err) { + return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err) + } + object := &unstructured.Unstructured{} + object.SetNamespace(resource.GetNamespace()) + object.SetName(resource.GetName()) + object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind()) + err = c.Get(ctx, ctrl.ObjectKeyFromObject(object), object) + if err != nil { + return err + } + p, err := patch.PositiveMergePatch(object, resource) + if err != nil { + return err + } else if len(p) == 0 { + return nil + } + return c.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p)) +} + +func isIncompatibleServerError(err error) bool { + // First simpler check for older servers (i.e. OpenShift 3.11) + if strings.Contains(err.Error(), "415: Unsupported Media Type") { + return true + } + // 415: Unsupported media type means we're talking to a server which doesn't + // support server-side apply. + var serr *k8serrors.StatusError + if errors.As(err, &serr) { + return serr.Status().Code == http.StatusUnsupportedMediaType + } + // Non-StatusError means the error isn't because the server is incompatible. + return false +} + // KameletViewerRole installs the role that allows any user ro access kamelets in the global namespace. func KameletViewerRole(ctx context.Context, c client.Client, namespace string) error { if err := Resource(ctx, c, namespace, true, IdentityResourceCustomizer, "/viewer/user-global-kamelet-viewer-role.yaml"); err != nil { From 6018f927a3920cbb1bead52f53c7e67d62a969b9 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Mon, 6 Dec 2021 16:26:31 +0100 Subject: [PATCH 3/4] chore: Increase client-side throttling maximum burst --- pkg/client/client.go | 31 +++++++++++++++++-------------- pkg/cmd/operator/operator.go | 12 ++++++++++-- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 089336315a..3334e702af 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -23,10 +23,6 @@ import ( "os" "path/filepath" - "github.com/apache/camel-k/pkg/util" - - camelv1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1" - camelv1alpha1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1alpha1" user "github.com/mitchellh/go-homedir" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -36,18 +32,21 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" - clientscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest" - controller "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" "github.com/apache/camel-k/pkg/apis" camel "github.com/apache/camel-k/pkg/client/camel/clientset/versioned" + camelv1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1" + camelv1alpha1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1alpha1" + "github.com/apache/camel-k/pkg/util" ) const ( @@ -57,7 +56,7 @@ const ( // Client is an abstraction for a k8s client. type Client interface { - controller.Client + ctrl.Client kubernetes.Interface CamelV1() camelv1.CamelV1Interface CamelV1alpha1() camelv1alpha1.CamelV1alpha1Interface @@ -77,7 +76,7 @@ type Provider struct { } type defaultClient struct { - controller.Client + ctrl.Client kubernetes.Interface camel camel.Interface scheme *runtime.Scheme @@ -116,16 +115,20 @@ func NewOutOfClusterClient(kubeconfig string) (Client, error) { // NewClient creates a new k8s client that can be used from outside or in the cluster. func NewClient(fastDiscovery bool) (Client, error) { - // Get a config to talk to the apiserver cfg, err := config.GetConfig() if err != nil { return nil, err } + return NewClientWithConfig(fastDiscovery, cfg) +} - scheme := clientscheme.Scheme +// NewClientWithConfig creates a new k8s client that can be used from outside or in the cluster. +func NewClientWithConfig(fastDiscovery bool, cfg *rest.Config) (Client, error) { + clientScheme := scheme.Scheme // Setup Scheme for all resources - if err := apis.AddToScheme(scheme); err != nil { + err := apis.AddToScheme(clientScheme) + if err != nil { return nil, err } @@ -145,11 +148,11 @@ func NewClient(fastDiscovery bool) (Client, error) { } // Create a new client to avoid using cache (enabled by default with controller-runtime client) - clientOptions := controller.Options{ - Scheme: scheme, + clientOptions := ctrl.Options{ + Scheme: clientScheme, Mapper: mapper, } - dynClient, err := controller.New(cfg, clientOptions) + dynClient, err := ctrl.New(cfg, clientOptions) if err != nil { return nil, err } diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go index 1d140eae35..256d9d5b92 100644 --- a/pkg/cmd/operator/operator.go +++ b/pkg/cmd/operator/operator.go @@ -41,6 +41,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/healthz" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -93,7 +94,14 @@ func Run(healthPort, monitoringPort int32, leaderElection bool) { watchNamespace, err := getWatchNamespace() exitOnError(err, "failed to get watch namespace") - c, err := client.NewClient(false) + cfg, err := config.GetConfig() + exitOnError(err, "cannot get client config") + // Increase maximum burst that is used by client-side throttling, + // to prevent the requests made to apply the bundled Kamelets + // from being throttled. + cfg.QPS = 20 + cfg.Burst = 200 + c, err := client.NewClientWithConfig(false, cfg) exitOnError(err, "cannot initialize client") // We do not rely on the event broadcaster managed by controller runtime, @@ -178,7 +186,7 @@ func Run(healthPort, monitoringPort int32, leaderElection bool) { exitOnError(controller.AddToManager(mgr), "") log.Info("Installing operator resources") - installCtx, installCancel := context.WithTimeout(context.TODO(), 1*time.Minute) + installCtx, installCancel := context.WithTimeout(context.Background(), 1*time.Minute) defer installCancel() install.OperatorStartupOptionalTools(installCtx, c, watchNamespace, operatorNamespace, log) From aad29f0637bc436eaa82f851170412bcb61e24d6 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Tue, 7 Dec 2021 09:28:51 +0100 Subject: [PATCH 4/4] chore: Update Kamelet update e2e test --- ...upgrade_test.go => kamelet_update_test.go} | 24 +++++++++---------- e2e/support/test_support.go | 14 ++++------- 2 files changed, 16 insertions(+), 22 deletions(-) rename e2e/common/{kamelet_upgrade_test.go => kamelet_update_test.go} (67%) diff --git a/e2e/common/kamelet_upgrade_test.go b/e2e/common/kamelet_update_test.go similarity index 67% rename from e2e/common/kamelet_upgrade_test.go rename to e2e/common/kamelet_update_test.go index a426f46ca1..c39a981ead 100644 --- a/e2e/common/kamelet_upgrade_test.go +++ b/e2e/common/kamelet_update_test.go @@ -32,26 +32,26 @@ import ( "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" ) -const preExistingKameletMarker = "pre-existing-kamelet" - -func TestKameletUpgrade(t *testing.T) { +const customLabel = "custom-label" +func TestBundleKameletUpdate(t *testing.T) { WithNewTestNamespace(t, func(ns string) { - Expect(createOperatorManagedKamelet(ns, "http-sink")()).To(Succeed()) // Going to be replaced - Expect(createUserManagedKamelet(ns, "ftp-sink")()).To(Succeed()) // Left intact by the operator - // Leverages the fact that the default kamelet catalog contains embedded "http-sink" and "ftp-sink" + Expect(createBundleKamelet(ns, "http-sink")()).To(Succeed()) // Going to be replaced + Expect(createUserKamelet(ns, "user-sink")()).To(Succeed()) // Left intact by the operator Expect(Kamel("install", "-n", ns).Execute()).To(Succeed()) - Eventually(KameletHasLabel("http-sink", ns, preExistingKameletMarker)).Should(BeFalse()) - Consistently(KameletHasLabel("ftp-sink", ns, preExistingKameletMarker), 5*time.Second, 1*time.Second).Should(BeTrue()) + Eventually(Kamelet("http-sink", ns)). + Should(WithTransform(KameletLabels, HaveKeyWithValue(customLabel, "true"))) + Consistently(Kamelet("user-sink", ns), 5*time.Second, 1*time.Second). + Should(WithTransform(KameletLabels, HaveKeyWithValue(customLabel, "true"))) // Cleanup Expect(Kamel("delete", "--all", "-n", ns).Execute()).Should(BeNil()) }) } -func createOperatorManagedKamelet(ns string, name string) func() error { +func createBundleKamelet(ns string, name string) func() error { flow := map[string]interface{}{ "from": map[string]interface{}{ "uri": "kamelet:source", @@ -59,13 +59,13 @@ func createOperatorManagedKamelet(ns string, name string) func() error { } labels := map[string]string{ - preExistingKameletMarker: "true", + customLabel: "true", v1alpha1.KameletBundledLabel: "true", } return CreateKamelet(ns, name, flow, nil, labels) } -func createUserManagedKamelet(ns string, name string) func() error { +func createUserKamelet(ns string, name string) func() error { flow := map[string]interface{}{ "from": map[string]interface{}{ "uri": "kamelet:source", @@ -73,7 +73,7 @@ func createUserManagedKamelet(ns string, name string) func() error { } labels := map[string]string{ - preExistingKameletMarker: "true", + customLabel: "true", } return CreateKamelet(ns, name, flow, nil, labels) } diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go index 9147233db1..8d3ab965b4 100644 --- a/e2e/support/test_support.go +++ b/e2e/support/test_support.go @@ -1367,17 +1367,11 @@ func Kamelet(name string, ns string) func() *v1alpha1.Kamelet { } } -func KameletHasLabel(name string, ns string, label string) func() bool { - return func() bool { - k := Kamelet(name, ns)() - if k == nil { - return false - } - if _, ok := k.Labels[label]; ok { - return true - } - return false +func KameletLabels(kamelet *v1alpha1.Kamelet) map[string]string { + if kamelet == nil { + return map[string]string{} } + return kamelet.GetLabels() } func ClusterDomainName() (string, error) {