diff --git a/e2e/common/kamelet_upgrade_test.go b/e2e/common/kamelet_update_test.go similarity index 67% rename from e2e/common/kamelet_upgrade_test.go rename to e2e/common/kamelet_update_test.go index a426f46ca1..c39a981ead 100644 --- a/e2e/common/kamelet_upgrade_test.go +++ b/e2e/common/kamelet_update_test.go @@ -32,26 +32,26 @@ import ( "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" ) -const preExistingKameletMarker = "pre-existing-kamelet" - -func TestKameletUpgrade(t *testing.T) { +const customLabel = "custom-label" +func TestBundleKameletUpdate(t *testing.T) { WithNewTestNamespace(t, func(ns string) { - Expect(createOperatorManagedKamelet(ns, "http-sink")()).To(Succeed()) // Going to be replaced - Expect(createUserManagedKamelet(ns, "ftp-sink")()).To(Succeed()) // Left intact by the operator - // Leverages the fact that the default kamelet catalog contains embedded "http-sink" and "ftp-sink" + Expect(createBundleKamelet(ns, "http-sink")()).To(Succeed()) // Going to be replaced + Expect(createUserKamelet(ns, "user-sink")()).To(Succeed()) // Left intact by the operator Expect(Kamel("install", "-n", ns).Execute()).To(Succeed()) - Eventually(KameletHasLabel("http-sink", ns, preExistingKameletMarker)).Should(BeFalse()) - Consistently(KameletHasLabel("ftp-sink", ns, preExistingKameletMarker), 5*time.Second, 1*time.Second).Should(BeTrue()) + Eventually(Kamelet("http-sink", ns)). + Should(WithTransform(KameletLabels, HaveKeyWithValue(customLabel, "true"))) + Consistently(Kamelet("user-sink", ns), 5*time.Second, 1*time.Second). + Should(WithTransform(KameletLabels, HaveKeyWithValue(customLabel, "true"))) // Cleanup Expect(Kamel("delete", "--all", "-n", ns).Execute()).Should(BeNil()) }) } -func createOperatorManagedKamelet(ns string, name string) func() error { +func createBundleKamelet(ns string, name string) func() error { flow := map[string]interface{}{ "from": map[string]interface{}{ "uri": "kamelet:source", @@ -59,13 +59,13 @@ func createOperatorManagedKamelet(ns string, name string) func() error { } labels := map[string]string{ - preExistingKameletMarker: "true", + customLabel: "true", v1alpha1.KameletBundledLabel: "true", } return CreateKamelet(ns, name, flow, nil, labels) } -func createUserManagedKamelet(ns string, name string) func() error { +func createUserKamelet(ns string, name string) func() error { flow := map[string]interface{}{ "from": map[string]interface{}{ "uri": "kamelet:source", @@ -73,7 +73,7 @@ func createUserManagedKamelet(ns string, name string) func() error { } labels := map[string]string{ - preExistingKameletMarker: "true", + customLabel: "true", } return CreateKamelet(ns, name, flow, nil, labels) } diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go index 9147233db1..8d3ab965b4 100644 --- a/e2e/support/test_support.go +++ b/e2e/support/test_support.go @@ -1367,17 +1367,11 @@ func Kamelet(name string, ns string) func() *v1alpha1.Kamelet { } } -func KameletHasLabel(name string, ns string, label string) func() bool { - return func() bool { - k := Kamelet(name, ns)() - if k == nil { - return false - } - if _, ok := k.Labels[label]; ok { - return true - } - return false +func KameletLabels(kamelet *v1alpha1.Kamelet) map[string]string { + if kamelet == nil { + return map[string]string{} } + return kamelet.GetLabels() } func ClusterDomainName() (string, error) { diff --git a/go.mod b/go.mod index 34497e8339..e11b03b66a 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( go.uber.org/multierr v1.6.0 go.uber.org/zap v1.19.1 golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c gopkg.in/inf.v0 v0.9.1 gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.21.4 diff --git a/pkg/client/client.go b/pkg/client/client.go index 089336315a..3334e702af 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -23,10 +23,6 @@ import ( "os" "path/filepath" - "github.com/apache/camel-k/pkg/util" - - camelv1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1" - camelv1alpha1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1alpha1" user "github.com/mitchellh/go-homedir" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -36,18 +32,21 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" - clientscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest" - controller "sigs.k8s.io/controller-runtime/pkg/client" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" "github.com/apache/camel-k/pkg/apis" camel "github.com/apache/camel-k/pkg/client/camel/clientset/versioned" + camelv1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1" + camelv1alpha1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1alpha1" + "github.com/apache/camel-k/pkg/util" ) const ( @@ -57,7 +56,7 @@ const ( // Client is an abstraction for a k8s client. type Client interface { - controller.Client + ctrl.Client kubernetes.Interface CamelV1() camelv1.CamelV1Interface CamelV1alpha1() camelv1alpha1.CamelV1alpha1Interface @@ -77,7 +76,7 @@ type Provider struct { } type defaultClient struct { - controller.Client + ctrl.Client kubernetes.Interface camel camel.Interface scheme *runtime.Scheme @@ -116,16 +115,20 @@ func NewOutOfClusterClient(kubeconfig string) (Client, error) { // NewClient creates a new k8s client that can be used from outside or in the cluster. func NewClient(fastDiscovery bool) (Client, error) { - // Get a config to talk to the apiserver cfg, err := config.GetConfig() if err != nil { return nil, err } + return NewClientWithConfig(fastDiscovery, cfg) +} - scheme := clientscheme.Scheme +// NewClientWithConfig creates a new k8s client that can be used from outside or in the cluster. +func NewClientWithConfig(fastDiscovery bool, cfg *rest.Config) (Client, error) { + clientScheme := scheme.Scheme // Setup Scheme for all resources - if err := apis.AddToScheme(scheme); err != nil { + err := apis.AddToScheme(clientScheme) + if err != nil { return nil, err } @@ -145,11 +148,11 @@ func NewClient(fastDiscovery bool) (Client, error) { } // Create a new client to avoid using cache (enabled by default with controller-runtime client) - clientOptions := controller.Options{ - Scheme: scheme, + clientOptions := ctrl.Options{ + Scheme: clientScheme, Mapper: mapper, } - dynClient, err := controller.New(cfg, clientOptions) + dynClient, err := ctrl.New(cfg, clientOptions) if err != nil { return nil, err } diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go index 1d140eae35..256d9d5b92 100644 --- a/pkg/cmd/operator/operator.go +++ b/pkg/cmd/operator/operator.go @@ -41,6 +41,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/healthz" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -93,7 +94,14 @@ func Run(healthPort, monitoringPort int32, leaderElection bool) { watchNamespace, err := getWatchNamespace() exitOnError(err, "failed to get watch namespace") - c, err := client.NewClient(false) + cfg, err := config.GetConfig() + exitOnError(err, "cannot get client config") + // Increase maximum burst that is used by client-side throttling, + // to prevent the requests made to apply the bundled Kamelets + // from being throttled. + cfg.QPS = 20 + cfg.Burst = 200 + c, err := client.NewClientWithConfig(false, cfg) exitOnError(err, "cannot initialize client") // We do not rely on the event broadcaster managed by controller runtime, @@ -178,7 +186,7 @@ func Run(healthPort, monitoringPort int32, leaderElection bool) { exitOnError(controller.AddToManager(mgr), "") log.Info("Installing operator resources") - installCtx, installCancel := context.WithTimeout(context.TODO(), 1*time.Minute) + installCtx, installCancel := context.WithTimeout(context.Background(), 1*time.Minute) defer installCancel() install.OperatorStartupOptionalTools(installCtx, c, watchNamespace, operatorNamespace, log) diff --git a/pkg/install/kamelets.go b/pkg/install/kamelets.go index 91bebee673..e0f353e109 100644 --- a/pkg/install/kamelets.go +++ b/pkg/install/kamelets.go @@ -1,12 +1,12 @@ /* Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with +contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at +the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 +http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -19,22 +19,30 @@ package install import ( "context" + "errors" "fmt" - "io/ioutil" + "io/fs" + "net/http" "os" "path" + "path/filepath" "strings" - "github.com/apache/camel-k/pkg/util" + "golang.org/x/sync/errgroup" - "github.com/pkg/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" "github.com/apache/camel-k/pkg/client" + "github.com/apache/camel-k/pkg/util" "github.com/apache/camel-k/pkg/util/defaults" "github.com/apache/camel-k/pkg/util/kubernetes" + "github.com/apache/camel-k/pkg/util/patch" ) const ( @@ -42,7 +50,9 @@ const ( defaultKameletDir = "/kamelets/" ) -// KameletCatalog installs the bundled KameletCatalog into one namespace. +var hasServerSideApply = true + +// KameletCatalog installs the bundled Kamelets into the specified namespace. func KameletCatalog(ctx context.Context, c client.Client, namespace string) error { kameletDir := os.Getenv(kameletDirEnv) if kameletDir == "" { @@ -58,60 +68,123 @@ func KameletCatalog(ctx context.Context, c client.Client, namespace string) erro return fmt.Errorf("kamelet directory %q is a file", kameletDir) } - files, err := ioutil.ReadDir(kameletDir) + g, gCtx := errgroup.WithContext(ctx) + + err = filepath.WalkDir(kameletDir, func(p string, f fs.DirEntry, err error) error { + if err != nil { + return err + } + if f.IsDir() && f.Name() != d.Name() { + return fs.SkipDir + } + if !(strings.HasSuffix(f.Name(), ".yaml") || strings.HasSuffix(f.Name(), ".yml")) { + return nil + } + // We may want to throttle the creation of Go routines if the number of bundled Kamelets increases. + g.Go(func() error { + return applyKamelet(gCtx, c, path.Join(kameletDir, f.Name()), namespace) + }) + return nil + }) if err != nil { return err } - for _, file := range files { - if file.IsDir() || !(strings.HasSuffix(file.Name(), ".yaml") || strings.HasSuffix(file.Name(), ".yml")) { - continue - } + return g.Wait() +} - content, err := util.ReadFile(path.Join(kameletDir, file.Name())) - if err != nil { - return err - } +func applyKamelet(ctx context.Context, c client.Client, path string, namespace string) error { + content, err := util.ReadFile(path) + if err != nil { + return err + } - obj, err := kubernetes.LoadResourceFromYaml(c.GetScheme(), string(content)) - if err != nil { - return err - } - if k, ok := obj.(*v1alpha1.Kamelet); ok { - existing := &v1alpha1.Kamelet{} - err = c.Get(ctx, types.NamespacedName{Namespace: namespace, Name: k.Name}, existing) - if err != nil { - if k8serrors.IsNotFound(err) { - existing = nil - } else { - return err - } - } - - if existing == nil || existing.Labels[v1alpha1.KameletBundledLabel] == "true" { - if k.GetAnnotations() == nil { - k.SetAnnotations(make(map[string]string)) - } - k.GetAnnotations()[kamelVersionAnnotation] = defaults.Version - - if k.GetLabels() == nil { - k.SetLabels(make(map[string]string)) - } - k.GetLabels()[v1alpha1.KameletBundledLabel] = "true" - k.GetLabels()[v1alpha1.KameletReadOnlyLabel] = "true" - - err := ObjectOrCollect(ctx, c, namespace, nil, true, k) - if err != nil { - return errors.Wrapf(err, "could not create resource from file %q", path.Join(kameletDir, file.Name())) - } - } + obj, err := kubernetes.LoadResourceFromYaml(c.GetScheme(), string(content)) + if err != nil { + return err + } + kamelet, ok := obj.(*v1alpha1.Kamelet) + if !ok { + return fmt.Errorf("cannot load Kamelet from file %q", path) + } + + kamelet.Namespace = namespace + + if kamelet.GetAnnotations() == nil { + kamelet.SetAnnotations(make(map[string]string)) + } + kamelet.GetAnnotations()[kamelVersionAnnotation] = defaults.Version + if kamelet.GetLabels() == nil { + kamelet.SetLabels(make(map[string]string)) + } + kamelet.GetLabels()[v1alpha1.KameletBundledLabel] = "true" + kamelet.GetLabels()[v1alpha1.KameletReadOnlyLabel] = "true" + + if hasServerSideApply { + err := serverSideApply(ctx, c, kamelet) + switch { + case err == nil: + break + case isIncompatibleServerError(err): + hasServerSideApply = false + default: + return fmt.Errorf("could not apply Kamelet from file %q: %w", path, err) } + } else { + return clientSideApply(ctx, c, kamelet) } return nil } +func serverSideApply(ctx context.Context, c client.Client, resource runtime.Object) error { + target, err := patch.PositiveApplyPatch(resource) + if err != nil { + return err + } + return c.Patch(ctx, target, ctrl.Apply, ctrl.ForceOwnership, ctrl.FieldOwner("camel-k-operator")) +} + +func clientSideApply(ctx context.Context, c client.Client, resource ctrl.Object) error { + err := c.Create(ctx, resource) + if err == nil { + return nil + } else if !k8serrors.IsAlreadyExists(err) { + return fmt.Errorf("error during create resource: %s/%s: %w", resource.GetNamespace(), resource.GetName(), err) + } + object := &unstructured.Unstructured{} + object.SetNamespace(resource.GetNamespace()) + object.SetName(resource.GetName()) + object.SetGroupVersionKind(resource.GetObjectKind().GroupVersionKind()) + err = c.Get(ctx, ctrl.ObjectKeyFromObject(object), object) + if err != nil { + return err + } + p, err := patch.PositiveMergePatch(object, resource) + if err != nil { + return err + } else if len(p) == 0 { + return nil + } + return c.Patch(ctx, resource, ctrl.RawPatch(types.MergePatchType, p)) +} + +func isIncompatibleServerError(err error) bool { + // First simpler check for older servers (i.e. OpenShift 3.11) + if strings.Contains(err.Error(), "415: Unsupported Media Type") { + return true + } + // 415: Unsupported media type means we're talking to a server which doesn't + // support server-side apply. + var serr *k8serrors.StatusError + if errors.As(err, &serr) { + return serr.Status().Code == http.StatusUnsupportedMediaType + } + // Non-StatusError means the error isn't because the server is incompatible. + return false +} + // KameletViewerRole installs the role that allows any user ro access kamelets in the global namespace. func KameletViewerRole(ctx context.Context, c client.Client, namespace string) error { if err := Resource(ctx, c, namespace, true, IdentityResourceCustomizer, "/viewer/user-global-kamelet-viewer-role.yaml"); err != nil {