diff --git a/e2e/common/cli/install_test.go b/e2e/common/cli/install_test.go index d1a76dc3c6..3de1f837a6 100644 --- a/e2e/common/cli/install_test.go +++ b/e2e/common/cli/install_test.go @@ -38,7 +38,7 @@ import ( "github.com/apache/camel-k/pkg/util/defaults" "github.com/apache/camel-k/pkg/util/kubernetes" "github.com/apache/camel-k/pkg/util/openshift" - console "github.com/openshift/api/console/v1" + consolev1 "github.com/openshift/api/console/v1" corev1 "k8s.io/api/core/v1" ) @@ -98,11 +98,11 @@ func TestConsoleCliDownload(t *testing.T) { ocp, err := openshift.IsOpenShift(TestClient()) assert.Nil(t, err) - ok, err := kubernetes.IsAPIResourceInstalled(TestClient(), "console.openshift.io/v1", reflect.TypeOf(console.ConsoleCLIDownload{}).Name()) + ok, err := kubernetes.IsAPIResourceInstalled(TestClient(), "console.openshift.io/v1", reflect.TypeOf(consolev1.ConsoleCLIDownload{}).Name()) assert.Nil(t, err) if !ocp || !ok { - t.Skip("This test requires ConsoleCliDownload object which is available on OpenShift 4 only.") + t.Skip("This test requires ConsoleCliDownload object which is available on OpenShift 4+ only.") return } diff --git a/e2e/common/cron_test.go b/e2e/common/cron_test.go index dc26197db2..85b5ac211d 100644 --- a/e2e/common/cron_test.go +++ b/e2e/common/cron_test.go @@ -23,17 +23,29 @@ limitations under the License. package common import ( + "reflect" "testing" . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" . "github.com/apache/camel-k/e2e/support" v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/util/kubernetes" ) func TestRunCronExample(t *testing.T) { + ok, err := kubernetes.IsAPIResourceInstalled(TestClient(), batchv1.SchemeGroupVersion.Group, reflect.TypeOf(batchv1.CronJob{}).Name()) + assert.Nil(t, err) + + if !ok { + t.Skip("This test requires CronJob batch/v1 API installed.") + return + } + WithNewTestNamespace(t, func(ns string) { Expect(Kamel("install", "-n", ns).Execute()).To(Succeed()) diff --git a/e2e/support/test_support.go b/e2e/support/test_support.go index e1e87ec2c6..3fdd06077d 100644 --- a/e2e/support/test_support.go +++ b/e2e/support/test_support.go @@ -46,7 +46,7 @@ import ( "github.com/pkg/errors" "github.com/spf13/cobra" appsv1 "k8s.io/api/apps/v1" - "k8s.io/api/batch/v1beta1" + batchv1 "k8s.io/api/batch/v1" coordination "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -599,12 +599,12 @@ func RouteStatus(ns string, name string) func() string { } } -func IntegrationCronJob(ns string, name string) func() *v1beta1.CronJob { - return func() *v1beta1.CronJob { - lst := v1beta1.CronJobList{ +func IntegrationCronJob(ns string, name string) func() *batchv1.CronJob { + return func() *batchv1.CronJob { + lst := batchv1.CronJobList{ TypeMeta: metav1.TypeMeta{ Kind: "CronJob", - APIVersion: v1beta1.SchemeGroupVersion.String(), + APIVersion: batchv1.SchemeGroupVersion.String(), }, } err := TestClient().List(TestContext, &lst, diff --git a/pkg/cmd/operator/operator.go b/pkg/cmd/operator/operator.go index 5b5bc29d76..912c767d62 100644 --- a/pkg/cmd/operator/operator.go +++ b/pkg/cmd/operator/operator.go @@ -23,6 +23,7 @@ import ( "fmt" "math/rand" "os" + "reflect" "runtime" "strconv" "strings" @@ -32,10 +33,10 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" - batchv1beta1 "k8s.io/api/batch/v1beta1" coordination "k8s.io/api/coordination/v1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -193,6 +194,22 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID exitOnError(err, "cannot create Integration label selector") selector := labels.NewSelector().Add(*hasIntegrationLabel) + selectors := cache.SelectorsByObject{ + &corev1.Pod{}: {Label: selector}, + &appsv1.Deployment{}: {Label: selector}, + &batchv1.Job{}: {Label: selector}, + &servingv1.Service{}: {Label: selector}, + } + + if ok, err := kubernetes.IsAPIResourceInstalled(c, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil { + selectors[&batchv1.CronJob{}] = struct { + Label labels.Selector + Field fields.Selector + }{ + Label: selector, + } + } + mgr, err := manager.New(c.GetConfig(), manager.Options{ Namespace: watchNamespace, EventBroadcaster: broadcaster, @@ -205,13 +222,7 @@ func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID MetricsBindAddress: ":" + strconv.Itoa(int(monitoringPort)), NewCache: cache.BuilderWithOptions( cache.Options{ - SelectorsByObject: cache.SelectorsByObject{ - &corev1.Pod{}: {Label: selector}, - &appsv1.Deployment{}: {Label: selector}, - &batchv1beta1.CronJob{}: {Label: selector}, - &batchv1.Job{}: {Label: selector}, - &servingv1.Service{}: {Label: selector}, - }, + SelectorsByObject: selectors, }, ), }) diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go index 572d2ef9a4..11eb816415 100644 --- a/pkg/controller/integration/integration_controller.go +++ b/pkg/controller/integration/integration_controller.go @@ -24,7 +24,7 @@ import ( "time" appsv1 "k8s.io/api/apps/v1" - batchv1beta1 "k8s.io/api/batch/v1beta1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -205,8 +205,6 @@ func add(mgr manager.Manager, c client.Client, r reconcile.Reconciler) error { })). // Watch for the owned Deployments Owns(&appsv1.Deployment{}, builder.WithPredicates(StatusChangedPredicate{})). - // Watch for the owned CronJobs - Owns(&batchv1beta1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{})). // Watch for the Integration Pods Watches(&source.Kind{Type: &corev1.Pod{}}, handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) []reconcile.Request { @@ -225,6 +223,11 @@ func add(mgr manager.Manager, c client.Client, r reconcile.Reconciler) error { } })) + if ok, err := kubernetes.IsAPIResourceInstalled(c, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil { + // Watch for the owned CronJobs + b.Owns(&batchv1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{})) + } + // Watch for the owned Knative Services conditionally if ok, err := kubernetes.IsAPIResourceInstalled(c, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); err != nil { return err diff --git a/pkg/controller/integration/monitor.go b/pkg/controller/integration/monitor.go index 91ea73f638..3e68a3a24f 100644 --- a/pkg/controller/integration/monitor.go +++ b/pkg/controller/integration/monitor.go @@ -25,7 +25,7 @@ import ( "strconv" appsv1 "k8s.io/api/apps/v1" - batchv1beta1 "k8s.io/api/batch/v1beta1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -185,9 +185,9 @@ func (action *monitorAction) newController(ctx context.Context, env *trait.Envir integration: integration, } case isConditionTrue(integration, v1.IntegrationConditionCronJobAvailable): - obj = getUpdatedController(env, &batchv1beta1.CronJob{}) + obj = getUpdatedController(env, &batchv1.CronJob{}) controller = &cronJobController{ - obj: obj.(*batchv1beta1.CronJob), + obj: obj.(*batchv1.CronJob), integration: integration, client: action.client, context: ctx, diff --git a/pkg/controller/integration/monitor_cronjob.go b/pkg/controller/integration/monitor_cronjob.go index 8a024df169..77bf162b8b 100644 --- a/pkg/controller/integration/monitor_cronjob.go +++ b/pkg/controller/integration/monitor_cronjob.go @@ -22,7 +22,6 @@ import ( "fmt" batchv1 "k8s.io/api/batch/v1" - batchv1beta1 "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime/pkg/client" @@ -33,7 +32,7 @@ import ( ) type cronJobController struct { - obj *batchv1beta1.CronJob + obj *batchv1.CronJob integration *v1.Integration client client.Client context context.Context diff --git a/pkg/trait/container.go b/pkg/trait/container.go index 7c3bc90193..3b3ca484c5 100644 --- a/pkg/trait/container.go +++ b/pkg/trait/container.go @@ -22,7 +22,7 @@ import ( "path" appsv1 "k8s.io/api/apps/v1" - "k8s.io/api/batch/v1beta1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/intstr" @@ -298,7 +298,7 @@ func (t *containerTrait) configureContainer(e *Environment) error { } // CronJob - if err := e.Resources.VisitCronJobE(func(cron *v1beta1.CronJob) error { + if err := e.Resources.VisitCronJobE(func(cron *batchv1.CronJob) error { for _, envVar := range e.EnvVars { envvar.SetVar(&container.Env, envVar) } diff --git a/pkg/trait/cron.go b/pkg/trait/cron.go index 13c10ba8cd..0c078442c4 100644 --- a/pkg/trait/cron.go +++ b/pkg/trait/cron.go @@ -24,7 +24,6 @@ import ( "strings" batchv1 "k8s.io/api/batch/v1" - batchv1beta1 "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/pointer" @@ -173,7 +172,7 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) { } if t.ConcurrencyPolicy == "" { - t.ConcurrencyPolicy = string(batchv1beta1.ForbidConcurrent) + t.ConcurrencyPolicy = string(batchv1.ForbidConcurrent) } if (t.Schedule == "" && t.Components == "") && t.Fallback == nil { @@ -265,7 +264,7 @@ func (t *cronTrait) Apply(e *Environment) error { return nil } -func (t *cronTrait) getCronJobFor(e *Environment) *batchv1beta1.CronJob { +func (t *cronTrait) getCronJobFor(e *Environment) *batchv1.CronJob { annotations := make(map[string]string) if e.Integration.Annotations != nil { for k, v := range filterTransferableAnnotations(e.Integration.Annotations) { @@ -283,10 +282,10 @@ func (t *cronTrait) getCronJobFor(e *Environment) *batchv1beta1.CronJob { backoffLimit = *t.BackoffLimit } - cronjob := batchv1beta1.CronJob{ + cronjob := batchv1.CronJob{ TypeMeta: metav1.TypeMeta{ Kind: "CronJob", - APIVersion: batchv1beta1.SchemeGroupVersion.String(), + APIVersion: batchv1.SchemeGroupVersion.String(), }, ObjectMeta: metav1.ObjectMeta{ Name: e.Integration.Name, @@ -296,11 +295,11 @@ func (t *cronTrait) getCronJobFor(e *Environment) *batchv1beta1.CronJob { }, Annotations: e.Integration.Annotations, }, - Spec: batchv1beta1.CronJobSpec{ + Spec: batchv1.CronJobSpec{ Schedule: t.Schedule, - ConcurrencyPolicy: batchv1beta1.ConcurrencyPolicy(t.ConcurrencyPolicy), + ConcurrencyPolicy: batchv1.ConcurrencyPolicy(t.ConcurrencyPolicy), StartingDeadlineSeconds: t.StartingDeadlineSeconds, - JobTemplate: batchv1beta1.JobTemplateSpec{ + JobTemplate: batchv1.JobTemplateSpec{ Spec: batchv1.JobSpec{ ActiveDeadlineSeconds: &activeDeadline, BackoffLimit: &backoffLimit, diff --git a/pkg/trait/cron_test.go b/pkg/trait/cron_test.go index d6eb49169a..0f26124733 100644 --- a/pkg/trait/cron_test.go +++ b/pkg/trait/cron_test.go @@ -24,7 +24,7 @@ import ( passert "github.com/magiconair/properties/assert" "github.com/stretchr/testify/assert" - batchv1beta1 "k8s.io/api/batch/v1beta1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -428,7 +428,7 @@ func TestCronWithActiveDeadline(t *testing.T) { assert.Nil(t, ct.Fallback) assert.Contains(t, environment.Interceptors, "cron") - cronJob := environment.Resources.GetCronJob(func(job *batchv1beta1.CronJob) bool { return true }) + cronJob := environment.Resources.GetCronJob(func(job *batchv1.CronJob) bool { return true }) assert.NotNil(t, cronJob) assert.NotNil(t, cronJob.Spec.JobTemplate.Spec.ActiveDeadlineSeconds) @@ -501,7 +501,7 @@ func TestCronWithBackoffLimit(t *testing.T) { assert.Nil(t, ct.Fallback) assert.Contains(t, environment.Interceptors, "cron") - cronJob := environment.Resources.GetCronJob(func(job *batchv1beta1.CronJob) bool { return true }) + cronJob := environment.Resources.GetCronJob(func(job *batchv1.CronJob) bool { return true }) assert.NotNil(t, cronJob) assert.NotNil(t, cronJob.Spec.JobTemplate.Spec.ActiveDeadlineSeconds) diff --git a/pkg/trait/mount.go b/pkg/trait/mount.go index 10ab80f352..039e3ccc38 100644 --- a/pkg/trait/mount.go +++ b/pkg/trait/mount.go @@ -23,7 +23,7 @@ import ( "strings" appsv1 "k8s.io/api/apps/v1" - "k8s.io/api/batch/v1beta1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/utils/pointer" @@ -119,7 +119,7 @@ func (t *mountTrait) Apply(e *Environment) error { } // CronJob - if err := e.Resources.VisitCronJobE(func(cron *v1beta1.CronJob) error { + if err := e.Resources.VisitCronJobE(func(cron *batchv1.CronJob) error { volumes = &cron.Spec.JobTemplate.Spec.Template.Spec.Volumes visited = true return nil diff --git a/pkg/trait/pod.go b/pkg/trait/pod.go index 840f20dc8d..c6415189e4 100644 --- a/pkg/trait/pod.go +++ b/pkg/trait/pod.go @@ -21,7 +21,7 @@ import ( "fmt" appsv1 "k8s.io/api/apps/v1" - "k8s.io/api/batch/v1beta1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/strategicpatch" @@ -71,7 +71,7 @@ func (t *podTrait) Apply(e *Environment) error { } switch strategy { case ControllerStrategyCronJob: - e.Resources.VisitCronJob(func(c *v1beta1.CronJob) { + e.Resources.VisitCronJob(func(c *batchv1.CronJob) { if c.Name == e.Integration.Name { if patchedPodSpec, err = t.applyChangesTo(&c.Spec.JobTemplate.Spec.Template.Spec, changes); err == nil { c.Spec.JobTemplate.Spec.Template.Spec = *patchedPodSpec diff --git a/pkg/trait/test_support.go b/pkg/trait/test_support.go index 137f0f91d3..31d77adc68 100644 --- a/pkg/trait/test_support.go +++ b/pkg/trait/test_support.go @@ -21,7 +21,7 @@ import ( serving "knative.dev/serving/pkg/apis/serving/v1" appsv1 "k8s.io/api/apps/v1" - "k8s.io/api/batch/v1beta1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -93,12 +93,12 @@ func createNominalKnativeServiceTraitTest() (*Environment, *serving.Service) { return environment, knativeService } -func createNominalCronJobTraitTest() (*Environment, *v1beta1.CronJob) { - cronJob := &v1beta1.CronJob{ +func createNominalCronJobTraitTest() (*Environment, *batchv1.CronJob) { + cronJob := &batchv1.CronJob{ ObjectMeta: metav1.ObjectMeta{ Name: "integration-name", }, - Spec: v1beta1.CronJobSpec{}, + Spec: batchv1.CronJobSpec{}, } environment := &Environment{ diff --git a/pkg/trait/trait_types.go b/pkg/trait/trait_types.go index 492c3c22de..a53ec98410 100644 --- a/pkg/trait/trait_types.go +++ b/pkg/trait/trait_types.go @@ -27,7 +27,7 @@ import ( "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" - "k8s.io/api/batch/v1beta1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -326,7 +326,7 @@ func (e *Environment) GetIntegrationPodSpec() *corev1.PodSpec { } // Cronjob - cronJob := e.Resources.GetCronJob(func(c *v1beta1.CronJob) bool { + cronJob := e.Resources.GetCronJob(func(c *batchv1.CronJob) bool { return c.Name == e.Integration.Name }) if cronJob != nil { diff --git a/pkg/util/kubernetes/collection.go b/pkg/util/kubernetes/collection.go index 0a0579f418..00facb2447 100644 --- a/pkg/util/kubernetes/collection.go +++ b/pkg/util/kubernetes/collection.go @@ -19,7 +19,7 @@ package kubernetes import ( appsv1 "k8s.io/api/apps/v1" - "k8s.io/api/batch/v1beta1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -278,9 +278,9 @@ func (c *Collection) GetRoute(filter func(*routev1.Route) bool) *routev1.Route { } // GetCronJob returns a CronJob that matches the given function. -func (c *Collection) GetCronJob(filter func(job *v1beta1.CronJob) bool) *v1beta1.CronJob { - var retValue *v1beta1.CronJob - c.VisitCronJob(func(re *v1beta1.CronJob) { +func (c *Collection) GetCronJob(filter func(job *batchv1.CronJob) bool) *batchv1.CronJob { + var retValue *batchv1.CronJob + c.VisitCronJob(func(re *batchv1.CronJob) { if filter(re) { retValue = re } @@ -289,18 +289,18 @@ func (c *Collection) GetCronJob(filter func(job *v1beta1.CronJob) bool) *v1beta1 } // VisitCronJob executes the visitor function on all CronJob resources. -func (c *Collection) VisitCronJob(visitor func(*v1beta1.CronJob)) { +func (c *Collection) VisitCronJob(visitor func(*batchv1.CronJob)) { c.Visit(func(res runtime.Object) { - if conv, ok := res.(*v1beta1.CronJob); ok { + if conv, ok := res.(*batchv1.CronJob); ok { visitor(conv) } }) } // VisitCronJobE executes the visitor function on all CronJob resources. -func (c *Collection) VisitCronJobE(visitor func(*v1beta1.CronJob) error) error { +func (c *Collection) VisitCronJobE(visitor func(*batchv1.CronJob) error) error { return c.VisitE(func(res runtime.Object) error { - if conv, ok := res.(*v1beta1.CronJob); ok { + if conv, ok := res.(*batchv1.CronJob); ok { return visitor(conv) } @@ -383,7 +383,7 @@ func (c *Collection) VisitContainer(visitor func(container *corev1.Container)) { visitor(cntref) } }) - c.VisitCronJob(func(c *v1beta1.CronJob) { + c.VisitCronJob(func(c *batchv1.CronJob) { for idx := range c.Spec.JobTemplate.Spec.Template.Spec.Containers { cntref := &c.Spec.JobTemplate.Spec.Template.Spec.Containers[idx] visitor(cntref) @@ -405,7 +405,7 @@ func (c *Collection) GetController(filter func(object ctrl.Object) bool) ctrl.Ob if svc != nil { return svc } - cj := c.GetCronJob(func(job *v1beta1.CronJob) bool { + cj := c.GetCronJob(func(job *batchv1.CronJob) bool { return filter(job) }) if cj != nil { @@ -422,7 +422,7 @@ func (c *Collection) VisitPodSpec(visitor func(container *corev1.PodSpec)) { c.VisitKnativeConfigurationSpec(func(cs *serving.ConfigurationSpec) { visitor(&cs.Template.Spec.PodSpec) }) - c.VisitCronJob(func(d *v1beta1.CronJob) { + c.VisitCronJob(func(d *batchv1.CronJob) { visitor(&d.Spec.JobTemplate.Spec.Template.Spec) }) } @@ -435,7 +435,7 @@ func (c *Collection) VisitPodTemplateMeta(visitor func(meta *metav1.ObjectMeta)) c.VisitKnativeConfigurationSpec(func(cs *serving.ConfigurationSpec) { visitor(&cs.Template.ObjectMeta) }) - c.VisitCronJob(func(d *v1beta1.CronJob) { + c.VisitCronJob(func(d *batchv1.CronJob) { visitor(&d.Spec.JobTemplate.Spec.Template.ObjectMeta) }) }