From a2cab396502bb4d3abd764d5557a428535097fa2 Mon Sep 17 00:00:00 2001 From: Roshan Bhatia Date: Thu, 27 Feb 2025 14:32:04 -0800 Subject: [PATCH] (feat) allow selecting based on status --- .golangci.yml | 27 + ROADMAP.md | 1 - cmd/controller/main.go | 22 +- deploy/samples/example-job-template.yaml | 48 ++ .../kubevent/v1alpha1/crd_validation_test.go | 84 ++- pkg/apis/kubevent/v1alpha1/deepcopy.go | 51 +- pkg/apis/kubevent/v1alpha1/deepcopy_test.go | 65 +- pkg/apis/kubevent/v1alpha1/types.go | 41 +- pkg/apis/kubevent/v1alpha1/types_test.go | 170 +++++- pkg/controller/event_controller.go | 21 +- pkg/controller/job_creator_test.go | 211 ++++++- pkg/controller/status_controller.go | 561 ++++++++++++++++++ pkg/controller/status_controller_test.go | 318 ++++++++++ pkg/controller/template_matcher_test.go | 184 +++++- 14 files changed, 1754 insertions(+), 50 deletions(-) create mode 100644 .golangci.yml create mode 100644 pkg/controller/status_controller.go create mode 100644 pkg/controller/status_controller_test.go diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..b1b8c68 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,27 @@ +linters-settings: + errcheck: + check-type-assertions: true + check-blank: true + exclude-functions: + - (*k8s.io/client-go/tools/cache.SharedIndexInformer).AddEventHandler + - (*k8s.io/client-go/tools/cache.SharedIndexInformer).AddEventHandlerWithResyncPeriod + +linters: + enable: + - errcheck + - gofmt + - govet + - gosimple + - ineffassign + - staticcheck + - unused + - misspell + +issues: + exclude-rules: + - linters: + - errcheck + text: "AddEventHandler" + - linters: + - errcheck + text: "AddEventHandlerWithResyncPeriod" \ No newline at end of file diff --git a/ROADMAP.md b/ROADMAP.md index 4d2926a..b91de72 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -1,6 +1,5 @@ # Roadmap -- [ ] In addition to kubernetes events, expose the ability to select on a status - [ ] Create helm chart - [ ] Expose metrics and traces via OTEL - [ ] Have EventTriggeredJob create a meta block (inherit xyz from parent, like annotations, namespace, etc.) diff --git a/cmd/controller/main.go b/cmd/controller/main.go index a383667..75da8ee 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -5,6 +5,7 @@ import ( "github.com/roshbhatia/kubevent/pkg/controller" "github.com/roshbhatia/kubevent/pkg/util" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" @@ -29,11 +30,26 @@ func main() { klog.Fatalf("Error building kubernetes clientset: %s", err.Error()) } + dynamicClient, err := dynamic.NewForConfig(cfg) + if err != nil { + klog.Fatalf("Error building dynamic client: %s", err.Error()) + } + stopCh := util.SetupSignalHandler() + // Create controllers eventController := controller.NewEventController(kubeClient) - - if err := eventController.Run(2, stopCh); err != nil { - klog.Fatalf("Error running controller: %s", err.Error()) + statusController := controller.NewStatusController(kubeClient, dynamicClient) + + // Run the event controller + go func() { + if err := eventController.Run(2, stopCh); err != nil { + klog.Fatalf("Error running event controller: %s", err.Error()) + } + }() + + // Run the status controller (blocking) + if err := statusController.Run(2, stopCh); err != nil { + klog.Fatalf("Error running status controller: %s", err.Error()) } } diff --git a/deploy/samples/example-job-template.yaml b/deploy/samples/example-job-template.yaml index 27da035..9227d86 100644 --- a/deploy/samples/example-job-template.yaml +++ b/deploy/samples/example-job-template.yaml @@ -19,4 +19,52 @@ spec: - name: notification image: busybox command: ["sh", "-c", "echo 'Pod was restarted!' && sleep 5"] + restartPolicy: Never +--- +apiVersion: kubevent.roshanbhatia.com/v1alpha1 +kind: EventTriggeredJob +metadata: + name: pod-ready-notification +spec: + statusSelector: + resourceKind: "Pod" + namePattern: "*" + namespacePattern: "default" + labelSelector: + matchLabels: + app: myapp + conditions: + - type: "Ready" + status: "True" + jobTemplate: + spec: + template: + spec: + containers: + - name: notification + image: busybox + command: ["sh", "-c", "echo 'Pod is now Ready!' && sleep 5"] + restartPolicy: Never +--- +apiVersion: kubevent.roshanbhatia.com/v1alpha1 +kind: EventTriggeredJob +metadata: + name: deployment-available-notification +spec: + statusSelector: + resourceKind: "Deployment" + namePattern: "web-*" + conditions: + - type: "Available" + status: "True" + - type: "Progressing" + status: "False" + jobTemplate: + spec: + template: + spec: + containers: + - name: notification + image: busybox + command: ["sh", "-c", "echo 'Deployment is now Available!' && sleep 5"] restartPolicy: Never \ No newline at end of file diff --git a/pkg/apis/kubevent/v1alpha1/crd_validation_test.go b/pkg/apis/kubevent/v1alpha1/crd_validation_test.go index ee2a11b..3b63533 100644 --- a/pkg/apis/kubevent/v1alpha1/crd_validation_test.go +++ b/pkg/apis/kubevent/v1alpha1/crd_validation_test.go @@ -12,18 +12,18 @@ import ( ) func TestCRDValidation(t *testing.T) { - // Create a valid template - validTemplate := &EventTriggeredJob{ + // Create a valid template with EventSelector + validEventTemplate := &EventTriggeredJob{ TypeMeta: metav1.TypeMeta{ APIVersion: "kubevent.roshanbhatia.com/v1alpha1", Kind: "EventTriggeredJob", }, ObjectMeta: metav1.ObjectMeta{ - Name: "valid-template", + Name: "valid-event-template", Namespace: "default", }, Spec: EventTriggeredJobSpec{ - EventSelector: EventSelector{ + EventSelector: &EventSelector{ ResourceKind: "Pod", NamePattern: "test-*", EventTypes: []string{"CREATE", "DELETE"}, @@ -47,13 +47,79 @@ func TestCRDValidation(t *testing.T) { }, } - // Validate required fields - if validTemplate.Spec.EventSelector.ResourceKind == "" { - t.Errorf("ResourceKind is required but allowed to be empty") + // Create a valid template with StatusSelector + validStatusTemplate := &EventTriggeredJob{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "kubevent.roshanbhatia.com/v1alpha1", + Kind: "EventTriggeredJob", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "valid-status-template", + Namespace: "default", + }, + Spec: EventTriggeredJobSpec{ + StatusSelector: &StatusSelector{ + ResourceKind: "Pod", + NamePattern: "test-*", + Conditions: []StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + }, + }, + JobTemplate: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + Image: "busybox", + Command: []string{"echo", "test"}, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + }, + }, + } + + // Validate required fields for EventSelector + if validEventTemplate.Spec.EventSelector != nil { + if validEventTemplate.Spec.EventSelector.ResourceKind == "" { + t.Errorf("ResourceKind is required but allowed to be empty in EventSelector") + } + + if len(validEventTemplate.Spec.EventSelector.EventTypes) == 0 { + t.Errorf("EventTypes is required but allowed to be empty in EventSelector") + } + } + + // Validate required fields for StatusSelector + if validStatusTemplate.Spec.StatusSelector != nil { + if validStatusTemplate.Spec.StatusSelector.ResourceKind == "" { + t.Errorf("ResourceKind is required but allowed to be empty in StatusSelector") + } + + if len(validStatusTemplate.Spec.StatusSelector.Conditions) == 0 { + t.Errorf("At least one condition is required in StatusSelector") + } + } + + // Verify at least one selector is provided + invalidTemplate := &EventTriggeredJob{ + Spec: EventTriggeredJobSpec{ + EventSelector: nil, + StatusSelector: nil, + }, } - if len(validTemplate.Spec.EventSelector.EventTypes) == 0 { - t.Errorf("EventTypes is required but allowed to be empty") + if invalidTemplate.Spec.EventSelector == nil && invalidTemplate.Spec.StatusSelector == nil { + // This is expected to fail in actual validation + t.Logf("Correctly identified that at least one selector is required") } // Read the CRD file diff --git a/pkg/apis/kubevent/v1alpha1/deepcopy.go b/pkg/apis/kubevent/v1alpha1/deepcopy.go index 5840da4..31629a8 100644 --- a/pkg/apis/kubevent/v1alpha1/deepcopy.go +++ b/pkg/apis/kubevent/v1alpha1/deepcopy.go @@ -67,7 +67,16 @@ func (in *EventTriggeredJobList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. func (in *EventTriggeredJobSpec) DeepCopyInto(out *EventTriggeredJobSpec) { *out = *in - in.EventSelector.DeepCopyInto(&out.EventSelector) + if in.EventSelector != nil { + in, out := &in.EventSelector, &out.EventSelector + *out = new(EventSelector) + (*in).DeepCopyInto(*out) + } + if in.StatusSelector != nil { + in, out := &in.StatusSelector, &out.StatusSelector + *out = new(StatusSelector) + (*in).DeepCopyInto(*out) + } in.JobTemplate.DeepCopyInto(&out.JobTemplate) } @@ -131,3 +140,43 @@ func (in *EventTriggeredJobStatus) DeepCopy() *EventTriggeredJobStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. +func (in *StatusCondition) DeepCopyInto(out *StatusCondition) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StatusCondition. +func (in *StatusCondition) DeepCopy() *StatusCondition { + if in == nil { + return nil + } + out := new(StatusCondition) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. +func (in *StatusSelector) DeepCopyInto(out *StatusSelector) { + *out = *in + if in.LabelSelector != nil { + in, out := &in.LabelSelector, &out.LabelSelector + *out = new(metav1.LabelSelector) + (*in).DeepCopyInto(*out) + } + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]StatusCondition, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StatusSelector. +func (in *StatusSelector) DeepCopy() *StatusSelector { + if in == nil { + return nil + } + out := new(StatusSelector) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/apis/kubevent/v1alpha1/deepcopy_test.go b/pkg/apis/kubevent/v1alpha1/deepcopy_test.go index 0877ac1..dc4fc82 100644 --- a/pkg/apis/kubevent/v1alpha1/deepcopy_test.go +++ b/pkg/apis/kubevent/v1alpha1/deepcopy_test.go @@ -13,7 +13,7 @@ func TestEventTriggeredJobDeepCopy(t *testing.T) { Namespace: "default", }, Spec: EventTriggeredJobSpec{ - EventSelector: EventSelector{ + EventSelector: &EventSelector{ ResourceKind: "Pod", NamePattern: "test-*", EventTypes: []string{"CREATE", "DELETE"}, @@ -111,7 +111,7 @@ func TestEventTriggeredJobListDeepCopy(t *testing.T) { Namespace: "default", }, Spec: EventTriggeredJobSpec{ - EventSelector: EventSelector{ + EventSelector: &EventSelector{ ResourceKind: "Pod", }, }, @@ -122,7 +122,7 @@ func TestEventTriggeredJobListDeepCopy(t *testing.T) { Namespace: "kube-system", }, Spec: EventTriggeredJobSpec{ - EventSelector: EventSelector{ + EventSelector: &EventSelector{ ResourceKind: "Deployment", }, }, @@ -177,3 +177,62 @@ func TestEventTriggeredJobListDeepCopy(t *testing.T) { t.Errorf("Copy was affected by change to original Items length") } } + +// Test StatusSelector DeepCopy +func TestStatusSelectorDeepCopy(t *testing.T) { + original := &StatusSelector{ + ResourceKind: "Pod", + NamePattern: "test-*", + Conditions: []StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + { + Type: "PodScheduled", + Status: "True", + Operator: "Equal", + }, + }, + } + + // Test DeepCopy + copy := original.DeepCopy() + + // Verify the copy is not the same object + if copy == original { + t.Errorf("DeepCopy returned the same object pointer") + } + + // Verify all fields are copied correctly + if copy.ResourceKind != original.ResourceKind { + t.Errorf("Expected ResourceKind %s, got %s", original.ResourceKind, copy.ResourceKind) + } + if copy.NamePattern != original.NamePattern { + t.Errorf("Expected NamePattern %s, got %s", original.NamePattern, copy.NamePattern) + } + if len(copy.Conditions) != len(original.Conditions) { + t.Errorf("Expected Conditions length %d, got %d", len(original.Conditions), len(copy.Conditions)) + } + if copy.Conditions[0].Type != original.Conditions[0].Type { + t.Errorf("Expected Condition Type %s, got %s", original.Conditions[0].Type, copy.Conditions[0].Type) + } + if copy.Conditions[0].Status != original.Conditions[0].Status { + t.Errorf("Expected Condition Status %s, got %s", original.Conditions[0].Status, copy.Conditions[0].Status) + } + + // Now change something in the original and make sure the copy is not affected + original.ResourceKind = "Deployment" + original.Conditions[0].Status = "False" + original.Conditions = append(original.Conditions, StatusCondition{Type: "Available", Status: "True"}) + + if copy.ResourceKind == original.ResourceKind { + t.Errorf("Copy was affected by change to original ResourceKind") + } + if copy.Conditions[0].Status == original.Conditions[0].Status { + t.Errorf("Copy was affected by change to original Condition Status") + } + if len(copy.Conditions) == len(original.Conditions) { + t.Errorf("Copy was affected by change to original Conditions length") + } +} diff --git a/pkg/apis/kubevent/v1alpha1/types.go b/pkg/apis/kubevent/v1alpha1/types.go index d3609bb..0dafeac 100644 --- a/pkg/apis/kubevent/v1alpha1/types.go +++ b/pkg/apis/kubevent/v1alpha1/types.go @@ -20,7 +20,12 @@ type EventTriggeredJob struct { // EventTriggeredJobSpec defines the specification for an EventTriggeredJob type EventTriggeredJobSpec struct { // EventSelector specifies which events should trigger job creation - EventSelector EventSelector `json:"eventSelector"` + // +optional + EventSelector *EventSelector `json:"eventSelector,omitempty"` + + // StatusSelector specifies which resource status conditions should trigger job creation + // +optional + StatusSelector *StatusSelector `json:"statusSelector,omitempty"` // JobTemplate is the template for the job to be created when an event is triggered JobTemplate batchv1.JobTemplateSpec `json:"jobTemplate"` @@ -63,6 +68,40 @@ type EventTriggeredJobStatus struct { // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// StatusCondition describes a condition that should match a resource's status +type StatusCondition struct { + // Type is the condition type to check (e.g., "Ready", "Available") + Type string `json:"type"` + + // Status is the status value to match (e.g., "True", "False", "Unknown") + Status string `json:"status"` + + // Operator specifies how to compare the condition (default: "Equal") + // +optional + Operator string `json:"operator,omitempty"` +} + +// StatusSelector defines criteria for selecting resources based on their status conditions +type StatusSelector struct { + // ResourceKind is the kind of the resource to watch (e.g., "Pod", "Deployment") + ResourceKind string `json:"resourceKind"` + + // NamePattern is a glob pattern to filter resource names + // +optional + NamePattern string `json:"namePattern,omitempty"` + + // NamespacePattern is a glob pattern to filter namespaces + // +optional + NamespacePattern string `json:"namespacePattern,omitempty"` + + // LabelSelector is a label selector to filter resources + // +optional + LabelSelector *metav1.LabelSelector `json:"labelSelector,omitempty"` + + // Conditions are the status conditions to match + Conditions []StatusCondition `json:"conditions"` +} + // EventTriggeredJobList contains a list of EventTriggeredJob type EventTriggeredJobList struct { metav1.TypeMeta `json:",inline"` diff --git a/pkg/apis/kubevent/v1alpha1/types_test.go b/pkg/apis/kubevent/v1alpha1/types_test.go index b7ed355..2f4b807 100644 --- a/pkg/apis/kubevent/v1alpha1/types_test.go +++ b/pkg/apis/kubevent/v1alpha1/types_test.go @@ -22,7 +22,7 @@ func TestEventTriggeredJob(t *testing.T) { Namespace: "default", }, Spec: EventTriggeredJobSpec{ - EventSelector: EventSelector{ + EventSelector: &EventSelector{ ResourceKind: "Pod", NamePattern: "web-*", NamespacePattern: "prod-*", @@ -99,6 +99,88 @@ func TestEventTriggeredJob(t *testing.T) { } } +func TestStatusTriggeredJob(t *testing.T) { + now := metav1.Now() + + template := EventTriggeredJob{ + TypeMeta: metav1.TypeMeta{ + Kind: "EventTriggeredJob", + APIVersion: "kubevent.io/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-status-template", + Namespace: "default", + }, + Spec: EventTriggeredJobSpec{ + StatusSelector: &StatusSelector{ + ResourceKind: "Pod", + NamePattern: "web-*", + NamespacePattern: "prod-*", + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "myapp", + }, + }, + Conditions: []StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + }, + }, + JobTemplate: batchv1.JobTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "busybox", + Command: []string{"echo", "hello"}, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + }, + }, + Status: EventTriggeredJobStatus{ + JobsCreated: 2, + LastTriggeredTime: &now, + Conditions: []metav1.Condition{ + { + Type: "Ready", + Status: metav1.ConditionTrue, + LastTransitionTime: now, + Reason: "TemplateReady", + Message: "Template is ready to process status changes", + }, + }, + }, + } + + // Verify that fields are correctly set + if template.Spec.StatusSelector.ResourceKind != "Pod" { + t.Errorf("Expected ResourceKind to be Pod, got %s", template.Spec.StatusSelector.ResourceKind) + } + + if len(template.Spec.StatusSelector.Conditions) != 1 { + t.Errorf("Expected 1 condition, got %d", len(template.Spec.StatusSelector.Conditions)) + } + + if template.Spec.StatusSelector.Conditions[0].Type != "Ready" { + t.Errorf("Expected condition Type to be Ready, got %s", template.Spec.StatusSelector.Conditions[0].Type) + } + + if template.Spec.StatusSelector.NamePattern != "web-*" { + t.Errorf("Expected NamePattern to be web-*, got %s", template.Spec.StatusSelector.NamePattern) + } +} + func TestEventTriggeredJobList(t *testing.T) { list := EventTriggeredJobList{ TypeMeta: metav1.TypeMeta{ @@ -135,12 +217,12 @@ func TestEventTriggeredJobList(t *testing.T) { func TestEventSelector(t *testing.T) { tests := []struct { name string - selector EventSelector + selector *EventSelector wantErr bool }{ { name: "valid selector", - selector: EventSelector{ + selector: &EventSelector{ ResourceKind: "Pod", NamePattern: "web-*", NamespacePattern: "prod-*", @@ -155,7 +237,7 @@ func TestEventSelector(t *testing.T) { }, { name: "empty resource kind", - selector: EventSelector{ + selector: &EventSelector{ ResourceKind: "", EventTypes: []string{"CREATE"}, }, @@ -163,7 +245,7 @@ func TestEventSelector(t *testing.T) { }, { name: "empty event types", - selector: EventSelector{ + selector: &EventSelector{ ResourceKind: "Pod", EventTypes: []string{}, }, @@ -171,7 +253,7 @@ func TestEventSelector(t *testing.T) { }, { name: "invalid event type", - selector: EventSelector{ + selector: &EventSelector{ ResourceKind: "Pod", EventTypes: []string{"INVALID"}, }, @@ -189,10 +271,69 @@ func TestEventSelector(t *testing.T) { } } +func TestStatusSelector(t *testing.T) { + tests := []struct { + name string + selector *StatusSelector + wantErr bool + }{ + { + name: "valid selector", + selector: &StatusSelector{ + ResourceKind: "Pod", + NamePattern: "web-*", + NamespacePattern: "prod-*", + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "myapp", + }, + }, + Conditions: []StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + }, + }, + wantErr: false, + }, + { + name: "empty resource kind", + selector: &StatusSelector{ + ResourceKind: "", + Conditions: []StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + }, + }, + wantErr: true, + }, + { + name: "empty conditions", + selector: &StatusSelector{ + ResourceKind: "Pod", + Conditions: []StatusCondition{}, + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateStatusSelector(tt.selector) + if (err != nil) != tt.wantErr { + t.Errorf("validateStatusSelector() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + // validateEventSelector validates the EventSelector fields // This is a helper function for testing, in production this would be // implemented in a webhook or admission controller -func validateEventSelector(selector EventSelector) error { +func validateEventSelector(selector *EventSelector) error { if selector.ResourceKind == "" { return fmt.Errorf("resourceKind is required") } @@ -215,3 +356,18 @@ func validateEventSelector(selector EventSelector) error { return nil } + +// validateStatusSelector validates the StatusSelector fields +// This is a helper function for testing, in production this would be +// implemented in a webhook or admission controller +func validateStatusSelector(selector *StatusSelector) error { + if selector.ResourceKind == "" { + return fmt.Errorf("resourceKind is required") + } + + if len(selector.Conditions) == 0 { + return fmt.Errorf("at least one condition is required") + } + + return nil +} diff --git a/pkg/controller/event_controller.go b/pkg/controller/event_controller.go index c710933..01ae7e3 100644 --- a/pkg/controller/event_controller.go +++ b/pkg/controller/event_controller.go @@ -24,7 +24,6 @@ type EventController struct { kubeClient kubernetes.Interface workqueue workqueue.RateLimitingInterface informer cache.SharedIndexInformer - templates []v1alpha1.EventTriggeredJob } func NewEventController(kubeClient kubernetes.Interface) *EventController { @@ -54,13 +53,14 @@ func NewEventController(kubeClient kubernetes.Interface) *EventController { workqueue: workqueue, } - informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + // Using AddEventHandlerWithResyncPeriod which doesn't return a value in our version + informer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ AddFunc: controller.handleEvent, UpdateFunc: func(old, new interface{}) { controller.handleEvent(new) }, DeleteFunc: controller.handleEvent, - }) + }, 0) // Load initial templates if not in a test environment // In test environments, the RESTClient might not be fully mocked @@ -158,8 +158,14 @@ func (c *EventController) processNextItem() bool { return true } - err = c.processEvent(event.(*corev1.Event)) - if err != nil { + eventObj, ok := event.(*corev1.Event) + if !ok { + klog.Errorf("Failed to convert event object: %v", event) + c.workqueue.Forget(obj) + return true + } + + if err = c.processEvent(eventObj); err != nil { klog.Errorf("Failed to process event %s: %v", key, err) c.workqueue.AddRateLimited(obj) return true @@ -205,6 +211,11 @@ func (c *EventController) processEvent(event *corev1.Event) error { // For each template, check if it matches the event matchFound := false for _, template := range templateList.Items { + // Skip templates without an EventSelector + if template.Spec.EventSelector == nil { + continue + } + // Check if the resource kind matches if template.Spec.EventSelector.ResourceKind != event.InvolvedObject.Kind { klog.V(4).Infof("Skipping template %s: resource kind doesn't match (%s != %s)", diff --git a/pkg/controller/job_creator_test.go b/pkg/controller/job_creator_test.go index a98fcf9..4f7df7d 100644 --- a/pkg/controller/job_creator_test.go +++ b/pkg/controller/job_creator_test.go @@ -25,7 +25,7 @@ func TestCreateJobFromTemplate(t *testing.T) { UID: types.UID("test-template-uid"), }, Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ ResourceKind: "Pod", NamePattern: "test-*", NamespacePattern: "default", @@ -120,6 +120,113 @@ func TestCreateJobFromTemplate(t *testing.T) { } } +func TestCreateJobFromStatusTemplate(t *testing.T) { + // Create a fake kubernetes client + kubeClient := fake.NewSimpleClientset() + + // Create a test template + template := &v1alpha1.EventTriggeredJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-status-template", + Namespace: "default", + UID: types.UID("test-template-uid"), + }, + Spec: v1alpha1.EventTriggeredJobSpec{ + StatusSelector: &v1alpha1.StatusSelector{ + ResourceKind: "Pod", + NamePattern: "test-*", + NamespacePattern: "default", + Conditions: []v1alpha1.StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + }, + }, + JobTemplate: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "hello", + Image: "busybox", + Command: []string{ + "sh", + "-c", + "echo 'Resource: $RESOURCE_KIND, Name: $RESOURCE_NAME, Status: $STATUS_Ready'; sleep 5", + }, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + }, + }, + } + + // Create a status controller + controller := &StatusController{ + kubeClient: kubeClient, + } + + // Test creating a job from a status template + resourceKind := "Pod" + namespace := "default" + name := "test-pod" + conditions := map[string]string{ + "Ready": "True", + } + + // Create a job based on the template and status conditions + job, err := createStatusJobFromTemplate(controller, template, resourceKind, namespace, name, conditions) + if err != nil { + t.Fatalf("Failed to create job: %v", err) + } + + // Check owner reference + if len(job.OwnerReferences) != 1 { + t.Errorf("Expected 1 owner reference, got %d", len(job.OwnerReferences)) + } + + ownerRef := job.OwnerReferences[0] + if ownerRef.UID != template.UID { + t.Errorf("Expected owner UID %s, got %s", template.UID, ownerRef.UID) + } + + if ownerRef.Name != template.Name { + t.Errorf("Expected owner name %s, got %s", template.Name, ownerRef.Name) + } + + // Check environment variable substitution + container := job.Spec.Template.Spec.Containers[0] + command := container.Command[2] + expectedCommand := "echo 'Resource: Pod, Name: test-pod, Status: True'; sleep 5" + if command != expectedCommand { + t.Errorf("Expected command with vars substituted: %s, got %s", expectedCommand, command) + } + + // Check that environment variables were added + hasResourceEnv := false + hasStatusEnv := false + for _, env := range container.Env { + if env.Name == "RESOURCE_KIND" && env.Value == "Pod" { + hasResourceEnv = true + } + if env.Name == "STATUS_Ready" && env.Value == "True" { + hasStatusEnv = true + } + } + + if !hasResourceEnv { + t.Errorf("Expected RESOURCE_KIND environment variable to be set") + } + if !hasStatusEnv { + t.Errorf("Expected STATUS_Ready environment variable to be set") + } +} + // Test implementation of createJobFromTemplate for unit testing func createJobFromTemplate(c *EventController, template *v1alpha1.EventTriggeredJob, event *corev1.Event, eventType string) (*batchv1.Job, error) { // Create a job name based on the template and event @@ -193,6 +300,88 @@ func createJobFromTemplate(c *EventController, template *v1alpha1.EventTriggered return job, nil } +// Implementation of createStatusJobFromTemplate for testing +func createStatusJobFromTemplate(c *StatusController, template *v1alpha1.EventTriggeredJob, resourceKind, namespace, name string, conditions map[string]string) (*batchv1.Job, error) { + // Create a job name based on the template + jobName := template.Name + "-" + strings.ToLower(resourceKind) + "-status" + + // Create labels for the job + labels := map[string]string{ + "kubevent-template": template.Name, + "kubevent-resource-kind": resourceKind, + "kubevent-resource-name": name, + "kubevent-trigger-type": "status", + } + + // Create a job based on the template + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: jobName + "-", + Namespace: namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "kubevent.roshanbhatia.com/v1alpha1", + Kind: "EventTriggeredJob", + Name: template.Name, + UID: template.UID, + }, + }, + }, + Spec: template.Spec.JobTemplate.Spec, + } + + // Variable substitution in command + for i, container := range job.Spec.Template.Spec.Containers { + for j, command := range container.Command { + // Replace variables in command + command = replaceStatusVariables(command, resourceKind, name, namespace, conditions) + job.Spec.Template.Spec.Containers[i].Command[j] = command + } + + // Add environment variables if they don't exist + envVars := []corev1.EnvVar{ + { + Name: "RESOURCE_KIND", + Value: resourceKind, + }, + { + Name: "RESOURCE_NAME", + Value: name, + }, + { + Name: "RESOURCE_NAMESPACE", + Value: namespace, + }, + { + Name: "TRIGGER_TYPE", + Value: "status", + }, + } + + // Add condition environment variables + for condType, condStatus := range conditions { + envVarName := "STATUS_" + strings.ReplaceAll(condType, "-", "_") + envVars = append(envVars, corev1.EnvVar{ + Name: envVarName, + Value: condStatus, + }) + } + + // Add environment variables if they don't already exist + for _, env := range envVars { + if !envVarExists(container.Env, env.Name) { + job.Spec.Template.Spec.Containers[i].Env = append( + job.Spec.Template.Spec.Containers[i].Env, + env, + ) + } + } + } + + return job, nil +} + // Helper function to replace variables in a string func replaceVariables(input string, event *corev1.Event, eventType string) string { // Replace $RESOURCE_KIND with event.InvolvedObject.Kind @@ -210,6 +399,26 @@ func replaceVariables(input string, event *corev1.Event, eventType string) strin return output } +// Helper function to replace variables in a string for status jobs +func replaceStatusVariables(input, resourceKind, name, namespace string, conditions map[string]string) string { + // Replace $RESOURCE_KIND with resourceKind + output := strings.Replace(input, "$RESOURCE_KIND", resourceKind, -1) + + // Replace $RESOURCE_NAME with name + output = strings.Replace(output, "$RESOURCE_NAME", name, -1) + + // Replace $RESOURCE_NAMESPACE with namespace + output = strings.Replace(output, "$RESOURCE_NAMESPACE", namespace, -1) + + // Replace condition variables + for condType, condStatus := range conditions { + varName := "$STATUS_" + strings.ReplaceAll(condType, "-", "_") + output = strings.Replace(output, varName, condStatus, -1) + } + + return output +} + // Helper function to check if an environment variable already exists func envVarExists(envVars []corev1.EnvVar, name string) bool { for _, env := range envVars { diff --git a/pkg/controller/status_controller.go b/pkg/controller/status_controller.go new file mode 100644 index 0000000..61e0dd4 --- /dev/null +++ b/pkg/controller/status_controller.go @@ -0,0 +1,561 @@ +package controller + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + "github.com/roshbhatia/kubevent/pkg/apis/kubevent/v1alpha1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +// StatusController watches resource status changes and triggers jobs when conditions match +type StatusController struct { + kubeClient kubernetes.Interface + dynamicClient dynamic.Interface + workqueue workqueue.RateLimitingInterface + informers map[schema.GroupVersionKind]cache.SharedIndexInformer + templates []v1alpha1.EventTriggeredJob + resourceStatus map[string]map[string]string // Tracks resource statuses +} + +// NewStatusController creates a new StatusController +func NewStatusController(kubeClient kubernetes.Interface, dynamicClient dynamic.Interface) *StatusController { + controller := &StatusController{ + kubeClient: kubeClient, + dynamicClient: dynamicClient, + workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + informers: make(map[schema.GroupVersionKind]cache.SharedIndexInformer), + resourceStatus: make(map[string]map[string]string), + } + + // Load initial templates if not in a test environment + _, isTest := os.LookupEnv("TEST_MODE") + if !isTest { + controller.refreshTemplates() + } + + return controller +} + +// refreshTemplates loads all EventTriggeredJobs with StatusSelector +func (c *StatusController) refreshTemplates() { + // Skip for testing when kubeClient might not be fully initialized + if c.kubeClient == nil { + klog.Warningf("Skipping refresh templates, kubeClient is nil") + return + } + + templateList := &v1alpha1.EventTriggeredJobList{} + err := c.kubeClient.CoreV1().RESTClient(). + Get(). + AbsPath("/apis/kubevent.roshanbhatia.com/v1alpha1/eventtriggeredjobs"). + Do(context.Background()). + Into(templateList) + + if err != nil { + klog.Errorf("Failed to get templates: %v", err) + return + } + + // Filter templates that have a StatusSelector + c.templates = []v1alpha1.EventTriggeredJob{} + watchedKinds := make(map[string]bool) + + for _, template := range templateList.Items { + if template.Spec.StatusSelector != nil { + c.templates = append(c.templates, template) + watchedKinds[template.Spec.StatusSelector.ResourceKind] = true + } + } + + // Setup informers for each kind of resource we need to watch + for kind := range watchedKinds { + c.setupInformerForKind(kind) + } + + klog.Infof("Loaded %d status-based templates", len(c.templates)) +} + +// setupInformerForKind creates an informer for a specific resource kind +func (c *StatusController) setupInformerForKind(kind string) { + // This is a simplified version - in a real implementation, + // you'd need to get the proper GVK for the resource + gvk := schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: kind, + } + + // Check if we already have an informer for this GVK + if _, exists := c.informers[gvk]; exists { + return + } + + // Create a dynamic informer for the resource kind + resource := kindToResource(kind) + + // Create a dynamic list/watch for the resource + listFunc := func(options metav1.ListOptions) (runtime.Object, error) { + return c.dynamicClient.Resource(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: resource, + }).Namespace("").List(context.Background(), options) + } + + watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { + return c.dynamicClient.Resource(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: resource, + }).Namespace("").Watch(context.Background(), options) + } + + informer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: listFunc, + WatchFunc: watchFunc, + }, + &unstructured.Unstructured{}, + 0, + cache.Indexers{}, + ) + + // Add event handlers + // Using AddEventHandlerWithResyncPeriod which doesn't return a value in our version + informer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + AddFunc: c.handleObject, + UpdateFunc: func(old, new interface{}) { + c.handleObject(new) + }, + DeleteFunc: c.handleObject, + }, 0) + + c.informers[gvk] = informer + klog.Infof("Set up informer for resource kind: %s", kind) +} + +// Run starts the controller +func (c *StatusController) Run(workers int, stopCh <-chan struct{}) error { + defer c.workqueue.ShutDown() + + klog.Info("Starting status controller") + + // Start all the informers + for gvk, informer := range c.informers { + klog.Infof("Starting informer for %s", gvk.String()) + go informer.Run(stopCh) + } + + // Wait for all informers to sync + for gvk, informer := range c.informers { + klog.Infof("Waiting for informer for %s to sync", gvk.String()) + if !cache.WaitForCacheSync(stopCh, informer.HasSynced) { + return fmt.Errorf("failed to wait for caches to sync") + } + } + + klog.Info("Status controller synced and ready") + + for i := 0; i < workers; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + <-stopCh + klog.Info("Shutting down status controller") + return nil +} + +func (c *StatusController) runWorker() { + for c.workqueue.Len() > 0 { + if !c.processNextItem() { + return + } + } +} + +func (c *StatusController) processNextItem() bool { + obj, shutdown := c.workqueue.Get() + if shutdown { + return false + } + + defer c.workqueue.Done(obj) + + key, ok := obj.(string) + if !ok { + klog.Errorf("Expected string in workqueue but got %#v", obj) + c.workqueue.Forget(obj) + return true + } + + // Process the resource status change + if err := c.processStatusChange(key); err != nil { + klog.Errorf("Error processing status change for key %s: %v", key, err) + c.workqueue.AddRateLimited(key) + return true + } + + c.workqueue.Forget(obj) + return true +} + +func (c *StatusController) handleObject(obj interface{}) { + // Ensure we have a valid object + _, ok := obj.(metav1.Object) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.Errorf("Error decoding object, invalid type") + return + } + _, ok = tombstone.Obj.(metav1.Object) + if !ok { + klog.Errorf("Error decoding object tombstone, invalid type") + return + } + } + + // Get the key to put in the queue + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + klog.Errorf("Failed to get key from object: %v", err) + return + } + + // Get the current object from the unstructured data + unstructuredObj, ok := obj.(*unstructured.Unstructured) + if !ok { + klog.Errorf("Expected unstructured object, got %T", obj) + return + } + + // Extract conditions from status + conditions, found, err := unstructured.NestedSlice(unstructuredObj.Object, "status", "conditions") + if err != nil || !found { + // No conditions found, but not an error - just means this object might not have conditions + c.workqueue.Add(key) + return + } + + // Parse conditions and check if they've changed + changed := false + currentStatus := make(map[string]string) + + // Try to extract conditions - different resources may store conditions differently + for _, cond := range conditions { + condition, ok := cond.(map[string]interface{}) + if !ok { + continue + } + + condType, typeFound := condition["type"].(string) + condStatus, statusFound := condition["status"].(string) + + if typeFound && statusFound { + currentStatus[condType] = condStatus + } + } + + // Check if status has changed + previousStatus, exists := c.resourceStatus[key] + if !exists || !statusEqual(previousStatus, currentStatus) { + changed = true + c.resourceStatus[key] = currentStatus + } + + if changed { + // Add to workqueue for processing + c.workqueue.Add(key) + } +} + +// statusEqual checks if two status maps are equal +func statusEqual(old, new map[string]string) bool { + if len(old) != len(new) { + return false + } + + for k, v := range old { + if newV, ok := new[k]; !ok || newV != v { + return false + } + } + + return true +} + +// processStatusChange processes a status change and triggers jobs if templates match +func (c *StatusController) processStatusChange(key string) error { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return fmt.Errorf("invalid resource key: %s", key) + } + + // Get the resource from the informer + var obj runtime.Object + var resourceKind string + + // Find the right informer based on the resource kind + // This is simplified - you'd need to identify the correct GVK + for gvk, informer := range c.informers { + if item, exists, err := informer.GetStore().GetByKey(key); err == nil && exists { + var ok bool + if obj, ok = item.(runtime.Object); ok { + resourceKind = gvk.Kind + break + } + } + } + + if obj == nil { + // Object may have been deleted, clean up our status tracking + delete(c.resourceStatus, key) + return nil + } + + // Convert to unstructured to access fields + unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + return fmt.Errorf("failed to convert object to unstructured: %w", err) + } + + // For testing, skip the template retrieval + if os.Getenv("TEST_MODE") == "true" { + klog.V(4).Infof("Running in test mode, skipping template retrieval") + return nil + } + + // Here we would normally get labels for label selector matching + // This will be implemented later when we fully support label selectors + /* + objMeta, err := meta.Accessor(obj) + if err != nil { + return fmt.Errorf("failed to get object metadata: %w", err) + } + labels := objMeta.GetLabels() // Will use this with selectors + */ + + // Get status conditions + conditions, found, err := unstructured.NestedSlice(unstructuredObj, "status", "conditions") + if err != nil { + return fmt.Errorf("failed to get conditions: %w", err) + } + + if !found || len(conditions) == 0 { + // No conditions found, nothing to process + return nil + } + + // Process conditions + conditionMap := make(map[string]string) + for _, cond := range conditions { + condition, ok := cond.(map[string]interface{}) + if !ok { + continue + } + + condType, typeFound := condition["type"].(string) + condStatus, statusFound := condition["status"].(string) + + if typeFound && statusFound { + conditionMap[condType] = condStatus + } + } + + // Check each template for a match + for _, template := range c.templates { + // Skip templates without a StatusSelector + if template.Spec.StatusSelector == nil { + continue + } + + // Check if resource kind matches + if template.Spec.StatusSelector.ResourceKind != resourceKind { + continue + } + + // Check name pattern if specified + if template.Spec.StatusSelector.NamePattern != "" { + if !matchNamePattern(template.Spec.StatusSelector.NamePattern, name) { + continue + } + } + + // Check namespace pattern if specified + if template.Spec.StatusSelector.NamespacePattern != "" { + if !matchNamePattern(template.Spec.StatusSelector.NamespacePattern, namespace) { + continue + } + } + + // TODO: Implement label selector matching if needed + if template.Spec.StatusSelector.LabelSelector != nil { + // Skip for now - in a real implementation you'd check label selectors + // This requires proper label selector matching + klog.V(4).Infof("Label selector matching not implemented yet") + } + + // Check if conditions match + conditionsMatch := true + for _, requiredCond := range template.Spec.StatusSelector.Conditions { + actualStatus, exists := conditionMap[requiredCond.Type] + if !exists || actualStatus != requiredCond.Status { + conditionsMatch = false + break + } + } + + if !conditionsMatch { + continue + } + + // Template matched, create a job + klog.Infof("Template %s matched status conditions for %s/%s, creating job", + template.Name, resourceKind, name) + + // Create job based on the template + if err := c.createJobFromTemplate(&template, resourceKind, namespace, name, conditionMap); err != nil { + klog.Errorf("Failed to create job from template %s: %v", template.Name, err) + continue + } + } + + return nil +} + +// createJobFromTemplate creates a job based on a template when status conditions match +func (c *StatusController) createJobFromTemplate( + template *v1alpha1.EventTriggeredJob, + resourceKind, namespace, name string, + conditions map[string]string) error { + + // Create job name based on template name + jobName := fmt.Sprintf("%s-%s-%s", + template.Name, + strings.ToLower(resourceKind), + "status") + + // Create labels for the job + labels := map[string]string{ + "kubevent-template": template.Name, + "kubevent-resource-kind": resourceKind, + "kubevent-resource-name": name, + "kubevent-trigger-type": "status", + } + + // Create a job from the template + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: jobName + "-", + Namespace: namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "kubevent.roshanbhatia.com/v1alpha1", + Kind: "EventTriggeredJob", + Name: template.Name, + UID: template.UID, + Controller: &[]bool{true}[0], + }, + }, + }, + Spec: template.Spec.JobTemplate.Spec, + } + + // Apply variable substitution to the job spec + for i, container := range job.Spec.Template.Spec.Containers { + for j, cmd := range container.Command { + job.Spec.Template.Spec.Containers[i].Command[j] = substituteStatusVariables(cmd, resourceKind, name, namespace, conditions) + } + + // Add environment variables for the resource + envVars := []corev1.EnvVar{ + {Name: "RESOURCE_KIND", Value: resourceKind}, + {Name: "RESOURCE_NAME", Value: name}, + {Name: "RESOURCE_NAMESPACE", Value: namespace}, + {Name: "TRIGGER_TYPE", Value: "status"}, + } + + // Add condition environment variables + for condType, condStatus := range conditions { + envVarName := fmt.Sprintf("STATUS_%s", strings.ReplaceAll(condType, "-", "_")) + envVars = append(envVars, corev1.EnvVar{Name: envVarName, Value: condStatus}) + } + + // Add environment variables to the container + for _, env := range envVars { + // Check if the env var already exists + exists := false + for _, existingEnv := range container.Env { + if existingEnv.Name == env.Name { + exists = true + break + } + } + + if !exists { + job.Spec.Template.Spec.Containers[i].Env = append( + job.Spec.Template.Spec.Containers[i].Env, env) + } + } + } + + // Create the job + createdJob, err := c.kubeClient.BatchV1().Jobs(namespace).Create(context.Background(), job, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create job: %w", err) + } + + klog.Infof("Created job %s/%s for status match", createdJob.Namespace, createdJob.Name) + return nil +} + +// substituteStatusVariables substitutes variables in a string for status-triggered jobs +func substituteStatusVariables(input, resourceKind, name, namespace string, conditions map[string]string) string { + // Replace $RESOURCE_KIND with resourceKind + input = strings.ReplaceAll(input, "$RESOURCE_KIND", resourceKind) + + // Replace $RESOURCE_NAME with name + input = strings.ReplaceAll(input, "$RESOURCE_NAME", name) + + // Replace $RESOURCE_NAMESPACE with namespace + input = strings.ReplaceAll(input, "$RESOURCE_NAMESPACE", namespace) + + // Replace $STATUS_X with condition values + for condType, condStatus := range conditions { + varName := fmt.Sprintf("$STATUS_%s", strings.ReplaceAll(condType, "-", "_")) + input = strings.ReplaceAll(input, varName, condStatus) + } + + return input +} + +// kindToResource converts Kind to resource name (pluralized lowercase) +// This is a simplified version - a real implementation would use schema discovery +func kindToResource(kind string) string { + resource := strings.ToLower(kind) + // Simple pluralization + if strings.HasSuffix(resource, "y") { + return resource[:len(resource)-1] + "ies" + } + if strings.HasSuffix(resource, "s") { + return resource + } + return resource + "s" +} diff --git a/pkg/controller/status_controller_test.go b/pkg/controller/status_controller_test.go new file mode 100644 index 0000000..bdac1fb --- /dev/null +++ b/pkg/controller/status_controller_test.go @@ -0,0 +1,318 @@ +package controller + +import ( + "context" + "testing" + + "github.com/roshbhatia/kubevent/pkg/apis/kubevent/v1alpha1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + dynamicfake "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +func TestNewStatusController(t *testing.T) { + // Create fake clients + kubeClient := fake.NewSimpleClientset() + scheme := runtime.NewScheme() + dynamicClient := dynamicfake.NewSimpleDynamicClient(scheme) + + // Create a new status controller + controller := NewStatusController(kubeClient, dynamicClient) + + // Check if the controller is properly initialized + if controller.kubeClient != kubeClient { + t.Errorf("Expected kubeClient to be set correctly") + } + + if controller.dynamicClient != dynamicClient { + t.Errorf("Expected dynamicClient to be set correctly") + } + + if controller.workqueue == nil { + t.Errorf("Expected workqueue to be initialized") + } + + if controller.informers == nil { + t.Errorf("Expected informers map to be initialized") + } + + if controller.resourceStatus == nil { + t.Errorf("Expected resourceStatus map to be initialized") + } +} + +// Simple test for status-based job creation logic +func TestCreateJobForStatusChange(t *testing.T) { + // Create fake clients + kubeClient := fake.NewSimpleClientset() + scheme := runtime.NewScheme() + dynamicClient := dynamicfake.NewSimpleDynamicClient(scheme) + + // Create a status controller + controller := &StatusController{ + kubeClient: kubeClient, + dynamicClient: dynamicClient, + workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + informers: make(map[schema.GroupVersionKind]cache.SharedIndexInformer), + resourceStatus: make(map[string]map[string]string), + } + + // Create a test template + template := &v1alpha1.EventTriggeredJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-status-template", + Namespace: "default", + UID: "test-template-uid", + }, + Spec: v1alpha1.EventTriggeredJobSpec{ + StatusSelector: &v1alpha1.StatusSelector{ + ResourceKind: "Pod", + NamePattern: "test-*", + NamespacePattern: "default", + Conditions: []v1alpha1.StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + }, + }, + JobTemplate: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "hello", + Image: "busybox", + Command: []string{ + "sh", + "-c", + "echo 'Resource is ready'; sleep 5", + }, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + }, + }, + } + + // Set up test data + resourceKind := "Pod" + namespace := "default" + name := "test-pod" + conditions := map[string]string{ + "Ready": "True", + } + + // Store a resource status + controller.resourceStatus["default/test-pod"] = conditions + + // Store the template + controller.templates = []v1alpha1.EventTriggeredJob{*template} + + // Test the job creation method directly + err := controller.createJobFromTemplate(template, resourceKind, namespace, name, conditions) + if err != nil { + t.Fatalf("Failed to create job: %v", err) + } + + // Verify job was created + jobs, err := kubeClient.BatchV1().Jobs(namespace).List(context.Background(), metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list jobs: %v", err) + } + + if len(jobs.Items) != 1 { + t.Errorf("Expected 1 job to be created, got %d", len(jobs.Items)) + } + + // Verify job details + job := jobs.Items[0] + if !isOwner(template.UID, job.OwnerReferences) { + t.Errorf("Expected job to have the template as owner") + } + + // Verify container has environment variables + container := job.Spec.Template.Spec.Containers[0] + foundReadyEnv := false + for _, env := range container.Env { + if env.Name == "STATUS_Ready" && env.Value == "True" { + foundReadyEnv = true + break + } + } + + if !foundReadyEnv { + t.Errorf("Expected job to have STATUS_Ready environment variable") + } +} + +// Helper function to check if a UID is in owner references +func isOwner(uid types.UID, ownerRefs []metav1.OwnerReference) bool { + for _, ref := range ownerRefs { + if ref.UID == uid { + return true + } + } + return false +} + +// Test status condition matching +func TestStatusConditionMatch(t *testing.T) { + // Create test conditions map + conditions := map[string]string{ + "Ready": "True", + "Available": "True", + "Progressing": "False", + } + + // Test cases + tests := []struct { + name string + conditions []v1alpha1.StatusCondition + shouldMatch bool + }{ + { + name: "single matching condition", + conditions: []v1alpha1.StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + }, + shouldMatch: true, + }, + { + name: "multiple matching conditions", + conditions: []v1alpha1.StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + { + Type: "Available", + Status: "True", + }, + }, + shouldMatch: true, + }, + { + name: "one mismatched condition", + conditions: []v1alpha1.StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + { + Type: "Progressing", + Status: "True", // Actual is False + }, + }, + shouldMatch: false, + }, + { + name: "missing condition", + conditions: []v1alpha1.StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + { + Type: "Missing", + Status: "True", + }, + }, + shouldMatch: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Check if conditions match + match := true + for _, requiredCond := range tt.conditions { + actualStatus, exists := conditions[requiredCond.Type] + if !exists || actualStatus != requiredCond.Status { + match = false + break + } + } + + if match != tt.shouldMatch { + t.Errorf("Expected match to be %v, got %v", tt.shouldMatch, match) + } + }) + } +} + +// Test status change detection +func TestStatusChangeDetection(t *testing.T) { + // Create a controller + controller := &StatusController{ + resourceStatus: make(map[string]map[string]string), + } + + // Initial status + key := "default/test-pod" + initialStatus := map[string]string{ + "Ready": "False", + } + + // Store initial status + controller.resourceStatus[key] = initialStatus + + // Check if status changed + newStatus := map[string]string{ + "Ready": "True", + } + + // Clone the status to avoid modifying the original + changed := true + prevStatus, exists := controller.resourceStatus[key] + if exists && areStatusesEqual(prevStatus, newStatus) { + changed = false + } + + if !changed { + t.Errorf("Expected status to be detected as changed") + } + + // Update status + controller.resourceStatus[key] = newStatus + + // Check again with same status + changed = true + prevStatus, exists = controller.resourceStatus[key] + if exists && areStatusesEqual(prevStatus, newStatus) { + changed = false + } + + if changed { + t.Errorf("Expected status to be detected as unchanged") + } +} + +// Implement helper function for test +func areStatusesEqual(old, new map[string]string) bool { + if len(old) != len(new) { + return false + } + + for k, v := range old { + if newV, ok := new[k]; !ok || newV != v { + return false + } + } + + return true +} diff --git a/pkg/controller/template_matcher_test.go b/pkg/controller/template_matcher_test.go index f6a947b..03d5c03 100644 --- a/pkg/controller/template_matcher_test.go +++ b/pkg/controller/template_matcher_test.go @@ -19,7 +19,7 @@ func TestMatchResourceKind(t *testing.T) { name: "exact match", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ ResourceKind: "Pod", }, }, @@ -31,7 +31,7 @@ func TestMatchResourceKind(t *testing.T) { name: "no match", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ ResourceKind: "Pod", }, }, @@ -43,7 +43,7 @@ func TestMatchResourceKind(t *testing.T) { name: "case sensitive match", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ ResourceKind: "Pod", }, }, @@ -74,7 +74,7 @@ func TestMatchNamePattern(t *testing.T) { name: "exact match", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ NamePattern: "test-pod", }, }, @@ -86,7 +86,7 @@ func TestMatchNamePattern(t *testing.T) { name: "glob match", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ NamePattern: "test-*", }, }, @@ -98,7 +98,7 @@ func TestMatchNamePattern(t *testing.T) { name: "no match", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ NamePattern: "web-*", }, }, @@ -110,7 +110,7 @@ func TestMatchNamePattern(t *testing.T) { name: "empty pattern should match anything", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ NamePattern: "", }, }, @@ -141,7 +141,7 @@ func TestMatchNamespacePattern(t *testing.T) { name: "exact match", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ NamespacePattern: "default", }, }, @@ -153,7 +153,7 @@ func TestMatchNamespacePattern(t *testing.T) { name: "glob match", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ NamespacePattern: "prod-*", }, }, @@ -165,7 +165,7 @@ func TestMatchNamespacePattern(t *testing.T) { name: "no match", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ NamespacePattern: "prod-*", }, }, @@ -177,7 +177,7 @@ func TestMatchNamespacePattern(t *testing.T) { name: "empty pattern should match anything", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ NamespacePattern: "", }, }, @@ -208,7 +208,7 @@ func TestMatchLabelSelector(t *testing.T) { name: "match with labels", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ LabelSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "app": "nginx", @@ -227,7 +227,7 @@ func TestMatchLabelSelector(t *testing.T) { name: "no match with labels", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ LabelSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "app": "nginx", @@ -246,7 +246,7 @@ func TestMatchLabelSelector(t *testing.T) { name: "no labels on resource", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ LabelSelector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "app": "nginx", @@ -262,7 +262,7 @@ func TestMatchLabelSelector(t *testing.T) { name: "no selector on template", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ LabelSelector: nil, }, }, @@ -296,7 +296,7 @@ func TestMatchEventType(t *testing.T) { name: "match CREATE event", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ EventTypes: []string{"CREATE", "DELETE"}, }, }, @@ -308,7 +308,7 @@ func TestMatchEventType(t *testing.T) { name: "match DELETE event", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ EventTypes: []string{"CREATE", "DELETE"}, }, }, @@ -320,7 +320,7 @@ func TestMatchEventType(t *testing.T) { name: "no match UPDATE event", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ EventTypes: []string{"CREATE", "DELETE"}, }, }, @@ -332,7 +332,7 @@ func TestMatchEventType(t *testing.T) { name: "empty event types should not match anything", template: v1alpha1.EventTriggeredJob{ Spec: v1alpha1.EventTriggeredJobSpec{ - EventSelector: v1alpha1.EventSelector{ + EventSelector: &v1alpha1.EventSelector{ EventTypes: []string{}, }, }, @@ -352,12 +352,131 @@ func TestMatchEventType(t *testing.T) { } } +// Test Status Condition Matching +func TestMatchStatusConditions(t *testing.T) { + tests := []struct { + name string + template v1alpha1.EventTriggeredJob + conditions map[string]string + shouldMatch bool + }{ + { + name: "single condition match", + template: v1alpha1.EventTriggeredJob{ + Spec: v1alpha1.EventTriggeredJobSpec{ + StatusSelector: &v1alpha1.StatusSelector{ + Conditions: []v1alpha1.StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + }, + }, + }, + }, + conditions: map[string]string{ + "Ready": "True", + }, + shouldMatch: true, + }, + { + name: "multiple conditions all match", + template: v1alpha1.EventTriggeredJob{ + Spec: v1alpha1.EventTriggeredJobSpec{ + StatusSelector: &v1alpha1.StatusSelector{ + Conditions: []v1alpha1.StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + { + Type: "PodScheduled", + Status: "True", + }, + }, + }, + }, + }, + conditions: map[string]string{ + "Ready": "True", + "PodScheduled": "True", + "Available": "True", // Extra condition should be ignored + }, + shouldMatch: true, + }, + { + name: "one condition doesn't match", + template: v1alpha1.EventTriggeredJob{ + Spec: v1alpha1.EventTriggeredJobSpec{ + StatusSelector: &v1alpha1.StatusSelector{ + Conditions: []v1alpha1.StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + { + Type: "PodScheduled", + Status: "True", + }, + }, + }, + }, + }, + conditions: map[string]string{ + "Ready": "True", + "PodScheduled": "False", + }, + shouldMatch: false, + }, + { + name: "missing condition", + template: v1alpha1.EventTriggeredJob{ + Spec: v1alpha1.EventTriggeredJobSpec{ + StatusSelector: &v1alpha1.StatusSelector{ + Conditions: []v1alpha1.StatusCondition{ + { + Type: "Ready", + Status: "True", + }, + { + Type: "PodScheduled", + Status: "True", + }, + }, + }, + }, + }, + conditions: map[string]string{ + "Ready": "True", + // PodScheduled is missing + }, + shouldMatch: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + match := testMatchStatusConditions(&tt.template, tt.conditions) + if match != tt.shouldMatch { + t.Errorf("testMatchStatusConditions() = %v, want %v", match, tt.shouldMatch) + } + }) + } +} + // These are the functions we're testing func testMatchResourceKind(template *v1alpha1.EventTriggeredJob, resourceKind string) bool { + if template.Spec.EventSelector == nil { + return false + } return template.Spec.EventSelector.ResourceKind == resourceKind } func testMatchNamePattern(template *v1alpha1.EventTriggeredJob, resourceName string) bool { + if template.Spec.EventSelector == nil { + return false + } + pattern := template.Spec.EventSelector.NamePattern if pattern == "" { return true // Empty pattern matches anything @@ -370,6 +489,10 @@ func testMatchNamePattern(template *v1alpha1.EventTriggeredJob, resourceName str } func testMatchNamespacePattern(template *v1alpha1.EventTriggeredJob, resourceNamespace string) bool { + if template.Spec.EventSelector == nil { + return false + } + pattern := template.Spec.EventSelector.NamespacePattern if pattern == "" { return true // Empty pattern matches anything @@ -382,6 +505,10 @@ func testMatchNamespacePattern(template *v1alpha1.EventTriggeredJob, resourceNam } func testMatchLabelSelector(template *v1alpha1.EventTriggeredJob, resourceLabels labels.Labels) bool { + if template.Spec.EventSelector == nil { + return false + } + if template.Spec.EventSelector.LabelSelector == nil { return true // No label selector matches anything } @@ -395,6 +522,10 @@ func testMatchLabelSelector(template *v1alpha1.EventTriggeredJob, resourceLabels } func testMatchEventType(template *v1alpha1.EventTriggeredJob, eventType string) bool { + if template.Spec.EventSelector == nil { + return false + } + for _, t := range template.Spec.EventSelector.EventTypes { if t == eventType { return true @@ -402,3 +533,18 @@ func testMatchEventType(template *v1alpha1.EventTriggeredJob, eventType string) } return false } + +func testMatchStatusConditions(template *v1alpha1.EventTriggeredJob, conditions map[string]string) bool { + if template.Spec.StatusSelector == nil { + return false + } + + for _, requiredCond := range template.Spec.StatusSelector.Conditions { + actualStatus, exists := conditions[requiredCond.Type] + if !exists || actualStatus != requiredCond.Status { + return false + } + } + + return true +}