Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: Faster operator startup #2814

Merged
merged 4 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,48 +32,48 @@ import (
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
)

const preExistingKameletMarker = "pre-existing-kamelet"

func TestKameletUpgrade(t *testing.T) {
const customLabel = "custom-label"

func TestBundleKameletUpdate(t *testing.T) {
WithNewTestNamespace(t, func(ns string) {
Expect(createOperatorManagedKamelet(ns, "http-sink")()).To(Succeed()) // Going to be replaced
Expect(createUserManagedKamelet(ns, "ftp-sink")()).To(Succeed()) // Left intact by the operator
// Leverages the fact that the default kamelet catalog contains embedded "http-sink" and "ftp-sink"
Expect(createBundleKamelet(ns, "http-sink")()).To(Succeed()) // Going to be replaced
Expect(createUserKamelet(ns, "user-sink")()).To(Succeed()) // Left intact by the operator

Expect(Kamel("install", "-n", ns).Execute()).To(Succeed())

Eventually(KameletHasLabel("http-sink", ns, preExistingKameletMarker)).Should(BeFalse())
Consistently(KameletHasLabel("ftp-sink", ns, preExistingKameletMarker), 5*time.Second, 1*time.Second).Should(BeTrue())
Eventually(Kamelet("http-sink", ns)).
Should(WithTransform(KameletLabels, HaveKeyWithValue(customLabel, "true")))
Consistently(Kamelet("user-sink", ns), 5*time.Second, 1*time.Second).
Should(WithTransform(KameletLabels, HaveKeyWithValue(customLabel, "true")))

// Cleanup
Expect(Kamel("delete", "--all", "-n", ns).Execute()).Should(BeNil())
})
}

func createOperatorManagedKamelet(ns string, name string) func() error {
func createBundleKamelet(ns string, name string) func() error {
flow := map[string]interface{}{
"from": map[string]interface{}{
"uri": "kamelet:source",
},
}

labels := map[string]string{
preExistingKameletMarker: "true",
customLabel: "true",
v1alpha1.KameletBundledLabel: "true",
}
return CreateKamelet(ns, name, flow, nil, labels)
}

func createUserManagedKamelet(ns string, name string) func() error {
func createUserKamelet(ns string, name string) func() error {
flow := map[string]interface{}{
"from": map[string]interface{}{
"uri": "kamelet:source",
},
}

labels := map[string]string{
preExistingKameletMarker: "true",
customLabel: "true",
}
return CreateKamelet(ns, name, flow, nil, labels)
}
14 changes: 4 additions & 10 deletions e2e/support/test_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -1367,17 +1367,11 @@ func Kamelet(name string, ns string) func() *v1alpha1.Kamelet {
}
}

func KameletHasLabel(name string, ns string, label string) func() bool {
return func() bool {
k := Kamelet(name, ns)()
if k == nil {
return false
}
if _, ok := k.Labels[label]; ok {
return true
}
return false
func KameletLabels(kamelet *v1alpha1.Kamelet) map[string]string {
if kamelet == nil {
return map[string]string{}
}
return kamelet.GetLabels()
}

func ClusterDomainName() (string, error) {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.19.1
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
gopkg.in/inf.v0 v0.9.1
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.21.4
Expand Down
31 changes: 17 additions & 14 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ import (
"os"
"path/filepath"

"github.com/apache/camel-k/pkg/util"

camelv1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1"
camelv1alpha1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1alpha1"
user "github.com/mitchellh/go-homedir"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand All @@ -36,18 +32,21 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"

"k8s.io/client-go/kubernetes"
clientscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest"

controller "sigs.k8s.io/controller-runtime/pkg/client"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/apache/camel-k/pkg/apis"
camel "github.com/apache/camel-k/pkg/client/camel/clientset/versioned"
camelv1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1"
camelv1alpha1 "github.com/apache/camel-k/pkg/client/camel/clientset/versioned/typed/camel/v1alpha1"
"github.com/apache/camel-k/pkg/util"
)

const (
Expand All @@ -57,7 +56,7 @@ const (

// Client is an abstraction for a k8s client.
type Client interface {
controller.Client
ctrl.Client
kubernetes.Interface
CamelV1() camelv1.CamelV1Interface
CamelV1alpha1() camelv1alpha1.CamelV1alpha1Interface
Expand All @@ -77,7 +76,7 @@ type Provider struct {
}

type defaultClient struct {
controller.Client
ctrl.Client
kubernetes.Interface
camel camel.Interface
scheme *runtime.Scheme
Expand Down Expand Up @@ -116,16 +115,20 @@ func NewOutOfClusterClient(kubeconfig string) (Client, error) {

// NewClient creates a new k8s client that can be used from outside or in the cluster.
func NewClient(fastDiscovery bool) (Client, error) {
// Get a config to talk to the apiserver
cfg, err := config.GetConfig()
if err != nil {
return nil, err
}
return NewClientWithConfig(fastDiscovery, cfg)
}

scheme := clientscheme.Scheme
// NewClientWithConfig creates a new k8s client that can be used from outside or in the cluster.
func NewClientWithConfig(fastDiscovery bool, cfg *rest.Config) (Client, error) {
clientScheme := scheme.Scheme

// Setup Scheme for all resources
if err := apis.AddToScheme(scheme); err != nil {
err := apis.AddToScheme(clientScheme)
if err != nil {
return nil, err
}

Expand All @@ -145,11 +148,11 @@ func NewClient(fastDiscovery bool) (Client, error) {
}

// Create a new client to avoid using cache (enabled by default with controller-runtime client)
clientOptions := controller.Options{
Scheme: scheme,
clientOptions := ctrl.Options{
Scheme: clientScheme,
Mapper: mapper,
}
dynClient, err := controller.New(cfg, clientOptions)
dynClient, err := ctrl.New(cfg, clientOptions)
if err != nil {
return nil, err
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/cmd/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (

"sigs.k8s.io/controller-runtime/pkg/cache"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/healthz"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -93,7 +94,14 @@ func Run(healthPort, monitoringPort int32, leaderElection bool) {
watchNamespace, err := getWatchNamespace()
exitOnError(err, "failed to get watch namespace")

c, err := client.NewClient(false)
cfg, err := config.GetConfig()
exitOnError(err, "cannot get client config")
// Increase maximum burst that is used by client-side throttling,
// to prevent the requests made to apply the bundled Kamelets
// from being throttled.
cfg.QPS = 20
cfg.Burst = 200
c, err := client.NewClientWithConfig(false, cfg)
exitOnError(err, "cannot initialize client")

// We do not rely on the event broadcaster managed by controller runtime,
Expand Down Expand Up @@ -178,7 +186,7 @@ func Run(healthPort, monitoringPort int32, leaderElection bool) {
exitOnError(controller.AddToManager(mgr), "")

log.Info("Installing operator resources")
installCtx, installCancel := context.WithTimeout(context.TODO(), 1*time.Minute)
installCtx, installCancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer installCancel()
install.OperatorStartupOptionalTools(installCtx, c, watchNamespace, operatorNamespace, log)

Expand Down
Loading