diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 8d1956fe18..9bcd72673d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -4,20 +4,13 @@ import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.Version; -import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; -import io.javaoperatorsdk.operator.processing.CustomResourceCache; -import io.javaoperatorsdk.operator.processing.DefaultEventHandler; -import io.javaoperatorsdk.operator.processing.EventDispatcher; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; -import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; -import io.javaoperatorsdk.operator.processing.retry.GenericRetry; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +27,8 @@ public Operator(KubernetesClient k8sClient, ConfigurationService configurationSe this.k8sClient = k8sClient; this.configurationService = configurationService; this.closeables = new ArrayList<>(); + + Runtime.getRuntime().addShutdownHook(new Thread(this::close)); } /** @@ -115,18 +110,7 @@ public void register( configuration = existing; } - final var retry = GenericRetry.fromConfiguration(configuration.getRetryConfiguration()); - - // check if we only want to watch the current namespace - var targetNamespaces = configuration.getNamespaces().toArray(new String[] {}); - if (configuration.watchCurrentNamespace()) { - targetNamespaces = - new String[] {configurationService.getClientConfiguration().getNamespace()}; - } - Class resClass = configuration.getCustomResourceClass(); - String finalizer = configuration.getFinalizer(); - final String controllerName = configuration.getName(); // check that the custom resource is known by the cluster if configured that way @@ -148,61 +132,18 @@ public void register( } final var client = k8sClient.customResources(resClass); - EventDispatcher dispatcher = new EventDispatcher<>(controller, finalizer, client); - - CustomResourceCache customResourceCache = - new CustomResourceCache(configurationService.getObjectMapper()); - DefaultEventHandler defaultEventHandler = - new DefaultEventHandler( - customResourceCache, - dispatcher, - controllerName, - retry, - configurationService.concurrentReconciliationThreads()); DefaultEventSourceManager eventSourceManager = - new DefaultEventSourceManager(defaultEventHandler, retry != null); - defaultEventHandler.setEventSourceManager(eventSourceManager); - dispatcher.setEventSourceManager(eventSourceManager); - + new DefaultEventSourceManager(controller, configuration, client); controller.init(eventSourceManager); - final boolean watchAllNamespaces = configuration.watchAllNamespaces(); - CustomResourceEventSource customResourceEventSource = - createCustomResourceEventSource( - client, - customResourceCache, - watchAllNamespaces, - targetNamespaces, - configuration.isGenerationAware(), - finalizer, - resClass); - - closeables.add(customResourceEventSource); closeables.add(eventSourceManager); - customResourceEventSource.setEventHandler(defaultEventHandler); - customResourceEventSource.start(); - log.info( "Registered Controller: '{}' for CRD: '{}' for namespace(s): {}", controllerName, resClass, - watchAllNamespaces ? "[all namespaces]" : Arrays.toString(targetNamespaces)); + configuration.watchAllNamespaces() + ? "[all namespaces]" + : configuration.getEffectiveNamespaces()); } } - - private CustomResourceEventSource createCustomResourceEventSource( - MixedOperation client, - CustomResourceCache customResourceCache, - boolean watchAllNamespaces, - String[] targetNamespaces, - boolean generationAware, - String finalizer, - Class resClass) { - - return watchAllNamespaces - ? CustomResourceEventSource.customResourceEventSourceForAllNamespaces( - customResourceCache, client, generationAware, finalizer, resClass) - : CustomResourceEventSource.customResourceEventSourceForTargetNamespaces( - customResourceCache, client, targetNamespaces, generationAware, finalizer, resClass); - } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/Context.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/Context.java index 7d04a5db93..10cba18985 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/Context.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/Context.java @@ -2,13 +2,10 @@ import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.processing.event.EventList; -import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import java.util.Optional; public interface Context { - EventSourceManager getEventSourceManager(); - EventList getEvents(); Optional getRetryInfo(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java index c922d674da..d650b8aa3b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java @@ -2,25 +2,16 @@ import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.processing.event.EventList; -import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import java.util.Optional; public class DefaultContext implements Context { private final RetryInfo retryInfo; private final EventList events; - private final EventSourceManager eventSourceManager; - public DefaultContext( - EventSourceManager eventSourceManager, EventList events, RetryInfo retryInfo) { + public DefaultContext(EventList events, RetryInfo retryInfo) { this.retryInfo = retryInfo; this.events = events; - this.eventSourceManager = eventSourceManager; - } - - @Override - public EventSourceManager getEventSourceManager() { - return eventSourceManager; } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java index de83fd9b36..0990907743 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java @@ -35,6 +35,7 @@ private void put( } } configurations.put(name, config); + config.setConfigurationService(this); } protected void throwExceptionOnNameCollision( diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java index a0fb7616f2..599ff46533 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java @@ -41,6 +41,15 @@ static boolean currentNamespaceWatched(Set namespaces) { && namespaces.contains(Controller.WATCH_CURRENT_NAMESPACE); } + default Set getEffectiveNamespaces() { + var targetNamespaces = getNamespaces(); + if (watchCurrentNamespace()) { + targetNamespaces = + Collections.singleton(getConfigurationService().getClientConfiguration().getNamespace()); + } + return targetNamespaces; + } + default RetryConfiguration getRetryConfiguration() { return RetryConfiguration.DEFAULT; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java index be8c82af02..10743fba7f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java @@ -70,6 +70,11 @@ public ControllerConfiguration build() { public Class getCustomResourceClass() { return original.getCustomResourceClass(); } + + @Override + public ConfigurationService getConfigurationService() { + return original.getConfigurationService(); + } }; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index 107a94e426..a082e770b0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -5,11 +5,14 @@ import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.RetryInfo; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; +import io.javaoperatorsdk.operator.processing.retry.GenericRetry; import io.javaoperatorsdk.operator.processing.retry.Retry; import io.javaoperatorsdk.operator.processing.retry.RetryExecution; import java.util.HashMap; @@ -18,6 +21,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +34,6 @@ public class DefaultEventHandler implements EventHandler { private static final Logger log = LoggerFactory.getLogger(DefaultEventHandler.class); - private final CustomResourceCache customResourceCache; private final EventBuffer eventBuffer; private final Set underProcessing = new HashSet<>(); private final ScheduledThreadPoolExecutor executor; @@ -43,12 +46,19 @@ public class DefaultEventHandler implements EventHandler { private final ReentrantLock lock = new ReentrantLock(); public DefaultEventHandler( - CustomResourceCache customResourceCache, + ResourceController controller, ControllerConfiguration configuration, MixedOperation client) { + this( + new EventDispatcher(controller, configuration.getFinalizer(), client), + configuration.getName(), + GenericRetry.fromConfiguration(configuration.getRetryConfiguration()), + configuration.getConfigurationService().concurrentReconciliationThreads()); + } + + DefaultEventHandler( EventDispatcher eventDispatcher, String relatedControllerName, Retry retry, int concurrentReconciliationThreads) { - this.customResourceCache = customResourceCache; this.eventDispatcher = eventDispatcher; this.retry = retry; this.controllerName = relatedControllerName; @@ -59,27 +69,14 @@ public DefaultEventHandler( runnable -> new Thread(runnable, "EventHandler-" + relatedControllerName)); } - public DefaultEventHandler( - CustomResourceCache customResourceCache, - EventDispatcher eventDispatcher, - String relatedControllerName, - Retry retry) { - this( - customResourceCache, - eventDispatcher, - relatedControllerName, - retry, - ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER); - } - @Override public void close() { - if (eventSourceManager != null) { - log.debug("Closing EventSourceManager {} -> {}", controllerName, eventSourceManager); - eventSourceManager.close(); + try { + log.debug("Closing handler for {}", controllerName); + executor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.debug("Exception closing handler for {}: {}", controllerName, e.getLocalizedMessage()); } - - executor.shutdownNow(); } public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) { @@ -102,7 +99,7 @@ private void executeBufferedEvents(String customResourceUid) { boolean newEventForResourceId = eventBuffer.containsEvents(customResourceUid); boolean controllerUnderExecution = isControllerUnderExecution(customResourceUid); Optional latestCustomResource = - customResourceCache.getLatestResource(customResourceUid); + eventSourceManager.getLatestResource(customResourceUid); if (!controllerUnderExecution && newEventForResourceId && latestCustomResource.isPresent()) { setUnderExecutionProcessing(customResourceUid); @@ -234,7 +231,7 @@ private void cacheUpdatedResourceIfChanged( getUID(originalCustomResource), getVersion(customResourceAfterExecution), getVersion(originalCustomResource)); - this.customResourceCache.cacheResource( + eventSourceManager.cacheResource( customResourceAfterExecution, customResource -> getVersion(customResource).equals(originalResourceVersion) @@ -245,7 +242,6 @@ private void cacheUpdatedResourceIfChanged( private void cleanupAfterDeletedEvent(String customResourceUid) { eventSourceManager.cleanup(customResourceUid); eventBuffer.cleanup(customResourceUid); - customResourceCache.cleanup(customResourceUid); } private boolean isControllerUnderExecution(String customResourceUid) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java index 7da0b8d607..c7f5b20cce 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java @@ -15,21 +15,19 @@ import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.UpdateControl; import io.javaoperatorsdk.operator.processing.event.EventList; -import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Dispatches events to the Controller and handles Finalizers for a single type of Custom Resource. */ -public class EventDispatcher { +class EventDispatcher { private static final Logger log = LoggerFactory.getLogger(EventDispatcher.class); private final ResourceController controller; private final String resourceFinalizer; private final CustomResourceFacade customResourceFacade; - private EventSourceManager eventSourceManager; EventDispatcher( ResourceController controller, @@ -47,10 +45,6 @@ public EventDispatcher( this(controller, finalizer, new CustomResourceFacade<>(client)); } - public void setEventSourceManager(EventSourceManager eventSourceManager) { - this.eventSourceManager = eventSourceManager; - } - public PostExecutionControl handleExecution(ExecutionScope executionScope) { try { return handleDispatch(executionScope); @@ -87,9 +81,7 @@ private PostExecutionControl handleDispatch(ExecutionScope executionScope) { } Context context = new DefaultContext<>( - eventSourceManager, - new EventList(executionScope.getEvents()), - executionScope.getRetryInfo()); + new EventList(executionScope.getEvents()), executionScope.getRetryInfo()); if (resource.isMarkedForDeletion()) { return handleDelete(resource, context); } else { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index 4a8a40517e..a3ea01461b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -1,6 +1,14 @@ package io.javaoperatorsdk.operator.processing.event; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.Resource; +import io.javaoperatorsdk.operator.api.ResourceController; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.processing.CustomResourceCache; import io.javaoperatorsdk.operator.processing.DefaultEventHandler; +import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; import java.util.Collections; import java.util.Map; @@ -8,12 +16,14 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DefaultEventSourceManager implements EventSourceManager { public static final String RETRY_TIMER_EVENT_SOURCE_NAME = "retry-timer-event-source"; + private static final String CUSTOM_RESOURCE_EVENT_SOURCE_NAME = "custom-resource-event-source"; private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class); private final ReentrantLock lock = new ReentrantLock(); @@ -21,14 +31,24 @@ public class DefaultEventSourceManager implements EventSourceManager { private final DefaultEventHandler defaultEventHandler; private TimerEventSource retryTimerEventSource; - public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolean supportRetry) { + DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolean supportRetry) { this.defaultEventHandler = defaultEventHandler; + defaultEventHandler.setEventSourceManager(this); if (supportRetry) { this.retryTimerEventSource = new TimerEventSource(); registerEventSource(RETRY_TIMER_EVENT_SOURCE_NAME, retryTimerEventSource); } } + public > DefaultEventSourceManager( + ResourceController controller, + ControllerConfiguration configuration, + MixedOperation, Resource> client) { + this(new DefaultEventHandler(controller, configuration, client), true); + registerEventSource( + CUSTOM_RESOURCE_EVENT_SOURCE_NAME, new CustomResourceEventSource<>(client, configuration)); + } + @Override public void close() { try { @@ -116,5 +136,24 @@ public void cleanup(String customResourceUid) { .keySet() .forEach(k -> deRegisterCustomResourceFromEventSource(k, customResourceUid)); eventSources.remove(customResourceUid); + getCache().cleanup(customResourceUid); + } + + // todo: remove + public CustomResourceCache getCache() { + final var source = + (CustomResourceEventSource) + getRegisteredEventSources().get(CUSTOM_RESOURCE_EVENT_SOURCE_NAME); + return source.getCache(); + } + + // todo: remove + public Optional getLatestResource(String customResourceUid) { + return getCache().getLatestResource(customResourceUid); + } + + // todo: remove + public void cacheResource(CustomResource resource, Predicate predicate) { + getCache().cacheResource(resource, predicate); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionDescriptor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionDescriptor.java deleted file mode 100644 index 676b355412..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionDescriptor.java +++ /dev/null @@ -1,34 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event; - -import io.javaoperatorsdk.operator.processing.ExecutionScope; -import io.javaoperatorsdk.operator.processing.PostExecutionControl; -import java.time.LocalDateTime; - -public class ExecutionDescriptor { - - private final ExecutionScope executionScope; - private final PostExecutionControl postExecutionControl; - private final LocalDateTime executionFinishedAt; - - public ExecutionDescriptor( - ExecutionScope executionScope, - PostExecutionControl postExecutionControl, - LocalDateTime executionFinishedAt) { - this.executionScope = executionScope; - this.postExecutionControl = postExecutionControl; - - this.executionFinishedAt = executionFinishedAt; - } - - public ExecutionScope getExecutionScope() { - return executionScope; - } - - public PostExecutionControl getPostExecutionControl() { - return postExecutionControl; - } - - public String getCustomResourceUid() { - return executionScope.getCustomResourceUid(); - } -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index ce0b190f64..871c52a8a2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -3,11 +3,13 @@ import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.dsl.internal.CustomResourceOperationsImpl; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.processing.CustomResourceCache; @@ -22,83 +24,85 @@ import org.slf4j.LoggerFactory; /** This is a special case since is not bound to a single custom resource */ -public class CustomResourceEventSource extends AbstractEventSource - implements Watcher { +public class CustomResourceEventSource> extends AbstractEventSource + implements Watcher { private static final Logger log = LoggerFactory.getLogger(CustomResourceEventSource.class); - private final CustomResourceCache resourceCache; - private final MixedOperation client; - private final String[] targetNamespaces; + private final CustomResourceOperationsImpl> client; + private final Set targetNamespaces; private final boolean generationAware; private final String resourceFinalizer; private final Map lastGenerationProcessedSuccessfully = new ConcurrentHashMap<>(); private final List watches; private final String resClass; - - public static CustomResourceEventSource customResourceEventSourceForAllNamespaces( - CustomResourceCache customResourceCache, - MixedOperation client, - boolean generationAware, - String resourceFinalizer, - Class resClass) { - return new CustomResourceEventSource( - customResourceCache, client, null, generationAware, resourceFinalizer, resClass); + private final CustomResourceCache customResourceCache; + + public CustomResourceEventSource( + MixedOperation, Resource> client, + ControllerConfiguration configuration) { + this( + client, + configuration.getEffectiveNamespaces(), + configuration.isGenerationAware(), + configuration.getFinalizer(), + configuration.getCustomResourceClass(), + new CustomResourceCache(configuration.getConfigurationService().getObjectMapper())); } - public static CustomResourceEventSource customResourceEventSourceForTargetNamespaces( - CustomResourceCache customResourceCache, - MixedOperation client, - String[] namespaces, + CustomResourceEventSource( + MixedOperation, Resource> client, + Set targetNamespaces, boolean generationAware, String resourceFinalizer, - Class resClass) { - return new CustomResourceEventSource( - customResourceCache, client, namespaces, generationAware, resourceFinalizer, resClass); + Class resClass) { + this( + client, + targetNamespaces, + generationAware, + resourceFinalizer, + resClass, + new CustomResourceCache()); } - private CustomResourceEventSource( - CustomResourceCache customResourceCache, - MixedOperation client, - String[] targetNamespaces, + CustomResourceEventSource( + MixedOperation, Resource> client, + Set targetNamespaces, boolean generationAware, String resourceFinalizer, - Class resClass) { - this.resourceCache = customResourceCache; - this.client = client; + Class resClass, + CustomResourceCache customResourceCache) { + this.client = (CustomResourceOperationsImpl>) client; this.targetNamespaces = targetNamespaces; this.generationAware = generationAware; this.resourceFinalizer = resourceFinalizer; this.watches = new ArrayList<>(); this.resClass = resClass.getName(); + this.customResourceCache = customResourceCache; } @Override public void start() { - CustomResourceOperationsImpl crClient = (CustomResourceOperationsImpl) client; - if (ControllerConfiguration.allNamespacesWatched(Set.of(targetNamespaces))) { - var w = crClient.inAnyNamespace().watch(this); + if (ControllerConfiguration.allNamespacesWatched(targetNamespaces)) { + var w = client.inAnyNamespace().watch(this); watches.add(w); log.debug("Registered controller {} -> {} for any namespace", resClass, w); - } else if (targetNamespaces.length == 0) { - var w = client.watch(this); - watches.add(w); - log.debug( - "Registered controller {} -> {} for namespace {}", resClass, w, crClient.getNamespace()); } else { - for (String targetNamespace : targetNamespaces) { - var w = crClient.inNamespace(targetNamespace).watch(this); - watches.add(w); - log.debug("Registered controller {} -> {} for namespace: {}", resClass, w, targetNamespace); - } + targetNamespaces.forEach( + ns -> { + var w = client.inNamespace(ns).watch(this); + watches.add(w); + log.debug("Registered controller {} -> {} for namespace: {}", resClass, w, ns); + }); } } @Override public void close() { + eventHandler.close(); for (Watch watch : this.watches) { try { - log.debug("Closing watch {} -> {}", resClass, watch); + log.info("Closing watch {} -> {}", resClass, watch); watch.close(); } catch (Exception e) { log.warn("Error closing watcher {} -> {}", resClass, watch, e); @@ -107,14 +111,15 @@ public void close() { } @Override - public void eventReceived(Watcher.Action action, CustomResource customResource) { + public void eventReceived(Watcher.Action action, T customResource) { log.debug( "Event received for action: {}, resource: {}", action.name(), customResource.getMetadata().getName()); - resourceCache.cacheResource( - customResource); // always store the latest event. Outside the sync block is intentional. + // cache the latest version of the CR + customResourceCache.cacheResource(customResource); + if (action == Action.ERROR) { log.debug( "Skipping {} event for custom resource uid: {}, version: {}", @@ -124,7 +129,7 @@ public void eventReceived(Watcher.Action action, CustomResource customResource) return; } - if (!skipBecauseOfGenerations(customResource)) { + if (!skipBecauseOfGeneration(customResource)) { eventHandler.handleEvent(new CustomResourceEvent(action, customResource, this)); markLastGenerationProcessed(customResource); } else { @@ -135,14 +140,14 @@ public void eventReceived(Watcher.Action action, CustomResource customResource) } } - private void markLastGenerationProcessed(CustomResource resource) { + private void markLastGenerationProcessed(T resource) { if (generationAware && resource.hasFinalizer(resourceFinalizer)) { lastGenerationProcessedSuccessfully.put( KubernetesResourceUtils.getUID(resource), resource.getMetadata().getGeneration()); } } - private boolean skipBecauseOfGenerations(CustomResource customResource) { + private boolean skipBecauseOfGeneration(T customResource) { if (!generationAware) { return false; } @@ -150,18 +155,14 @@ private boolean skipBecauseOfGenerations(CustomResource customResource) { if (customResource.isMarkedForDeletion()) { return false; } - if (!largerGenerationThenProcessedBefore(customResource)) { - return true; - } - return false; - } - public boolean largerGenerationThenProcessedBefore(CustomResource resource) { - Long lastGeneration = lastGenerationProcessedSuccessfully.get(resource.getMetadata().getUid()); + // only proceed if we haven't already seen this custom resource generation + Long lastGeneration = + lastGenerationProcessedSuccessfully.get(customResource.getMetadata().getUid()); if (lastGeneration == null) { - return true; + return false; } else { - return resource.getMetadata().getGeneration() > lastGeneration; + return customResource.getMetadata().getGeneration() <= lastGeneration; } } @@ -191,4 +192,9 @@ public void onClose(WatcherException e) { System.exit(1); } } + + // todo: remove + public CustomResourceCache getCache() { + return customResourceCache; + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index b998518d53..947ac74be8 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -4,6 +4,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; @@ -12,6 +14,7 @@ import static org.mockito.Mockito.when; import io.fabric8.kubernetes.client.Watcher; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; @@ -39,17 +42,22 @@ class DefaultEventHandlerTest { private CustomResourceCache customResourceCache = new CustomResourceCache(); private DefaultEventSourceManager defaultEventSourceManagerMock = mock(DefaultEventSourceManager.class); + private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); private DefaultEventHandler defaultEventHandler = - new DefaultEventHandler(customResourceCache, eventDispatcherMock, "Test", null); + new DefaultEventHandler( + eventDispatcherMock, + "Test", + null, + ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER); private DefaultEventHandler defaultEventHandlerWithRetry = new DefaultEventHandler( - customResourceCache, eventDispatcherMock, "Test", - GenericRetry.defaultLimitedExponentialRetry()); + GenericRetry.defaultLimitedExponentialRetry(), + ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER); @BeforeEach public void setup() { @@ -57,6 +65,19 @@ public void setup() { .thenReturn(retryTimerEventSourceMock); defaultEventHandler.setEventSourceManager(defaultEventSourceManagerMock); defaultEventHandlerWithRetry.setEventSourceManager(defaultEventSourceManagerMock); + + // todo: remove + when(defaultEventSourceManagerMock.getCache()).thenReturn(customResourceCache); + doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResource(any()); + doCallRealMethod().when(defaultEventSourceManagerMock).cacheResource(any(), any()); + doAnswer( + invocation -> { + final var resourceId = (String) invocation.getArgument(0); + customResourceCache.cleanup(resourceId); + return null; + }) + .when(defaultEventSourceManagerMock) + .cleanup(any()); } @Test diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java index 39ae1c5517..42fa9fa95d 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java @@ -34,6 +34,20 @@ public void registersEventSource() { assertThat(registeredSources.entrySet()).hasSize(1); assertThat(registeredSources.get(CUSTOM_EVENT_SOURCE_NAME)).isEqualTo(eventSource); verify(eventSource, times(1)).setEventHandler(eq(defaultEventHandlerMock)); + verify(eventSource, times(1)).start(); + } + + @Test + public void closeShouldCascadeToEventSources() { + EventSource eventSource = mock(EventSource.class); + EventSource eventSource2 = mock(EventSource.class); + defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource); + defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME + "2", eventSource2); + + defaultEventSourceManager.close(); + + verify(eventSource, times(1)).close(); + verify(eventSource2, times(1)).close(); } @Test diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java index 9bfd8e91e1..76c289b094 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java @@ -5,10 +5,10 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.internal.CustomResourceOperationsImpl; import io.javaoperatorsdk.operator.TestUtils; -import io.javaoperatorsdk.operator.processing.CustomResourceCache; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import java.time.LocalDateTime; @@ -19,13 +19,12 @@ class CustomResourceEventSourceTest { public static final String FINALIZER = "finalizer"; - CustomResourceCache customResourceCache = new CustomResourceCache(); - MixedOperation mixedOperation = mock(MixedOperation.class); + CustomResourceOperationsImpl> + client = mock(CustomResourceOperationsImpl.class); EventHandler eventHandler = mock(EventHandler.class); - private CustomResourceEventSource customResourceEventSource = - CustomResourceEventSource.customResourceEventSourceForAllNamespaces( - customResourceCache, mixedOperation, true, FINALIZER, TestCustomResource.class); + private CustomResourceEventSource customResourceEventSource = + new CustomResourceEventSource<>(client, null, true, FINALIZER, TestCustomResource.class); @BeforeEach public void setup() { @@ -72,8 +71,7 @@ public void normalExecutionIfGenerationChanges() { @Test public void handlesAllEventIfNotGenerationAware() { customResourceEventSource = - CustomResourceEventSource.customResourceEventSourceForAllNamespaces( - customResourceCache, mixedOperation, false, FINALIZER, TestCustomResource.class); + new CustomResourceEventSource<>(client, null, false, FINALIZER, TestCustomResource.class); setup(); TestCustomResource customResource1 = TestUtils.testCustomResource(); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/config/runtime/DefaultConfigurationServiceTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/config/runtime/DefaultConfigurationServiceTest.java index c7c8b2a0d9..f891208b14 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/config/runtime/DefaultConfigurationServiceTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/config/runtime/DefaultConfigurationServiceTest.java @@ -10,7 +10,6 @@ import io.javaoperatorsdk.operator.ControllerUtils; import io.javaoperatorsdk.operator.api.Context; import io.javaoperatorsdk.operator.api.Controller; -import io.javaoperatorsdk.operator.api.DeleteControl; import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.UpdateControl; import org.junit.jupiter.api.Test; @@ -59,13 +58,6 @@ static class TestCustomFinalizerController @Version("v1") public class InnerCustomResource extends CustomResource {} - @Override - public DeleteControl deleteResource( - TestCustomFinalizerController.InnerCustomResource resource, - Context context) { - return DeleteControl.DEFAULT_DELETE; - } - @Override public UpdateControl createOrUpdateResource( InnerCustomResource resource, Context context) { @@ -75,11 +67,6 @@ public UpdateControl createOr @Controller(generationAwareEventProcessing = false, name = "test") static class TestCustomResourceController implements ResourceController { - @Override - public DeleteControl deleteResource( - TestCustomResource resource, Context context) { - return DeleteControl.DEFAULT_DELETE; - } @Override public UpdateControl createOrUpdateResource( diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/doubleupdate/DoubleUpdateTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/doubleupdate/DoubleUpdateTestCustomResourceController.java index 613a1eba79..e3959681d9 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/doubleupdate/DoubleUpdateTestCustomResourceController.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/doubleupdate/DoubleUpdateTestCustomResourceController.java @@ -17,12 +17,6 @@ public class DoubleUpdateTestCustomResourceController public static final String TEST_ANNOTATION_VALUE = "TestAnnotationValue"; private final AtomicInteger numberOfExecutions = new AtomicInteger(0); - @Override - public DeleteControl deleteResource( - DoubleUpdateTestCustomResource resource, Context context) { - return DeleteControl.DEFAULT_DELETE; - } - @Override public UpdateControl createOrUpdateResource( DoubleUpdateTestCustomResource resource, Context context) { diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java index e77c7bdd14..7f42429b5f 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java @@ -5,7 +5,6 @@ import io.javaoperatorsdk.operator.TestExecutionInfoProvider; import io.javaoperatorsdk.operator.api.Context; import io.javaoperatorsdk.operator.api.Controller; -import io.javaoperatorsdk.operator.api.DeleteControl; import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.UpdateControl; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; @@ -33,12 +32,6 @@ public void init(EventSourceManager eventSourceManager) { eventSourceManager.registerEventSource("Timer", timerEventSource); } - @Override - public DeleteControl deleteResource( - EventSourceTestCustomResource resource, Context context) { - return DeleteControl.DEFAULT_DELETE; - } - @Override public UpdateControl createOrUpdateResource( EventSourceTestCustomResource resource, Context context) { diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java index 0bc1e227cd..512a0a55a4 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java @@ -5,7 +5,6 @@ import io.javaoperatorsdk.operator.TestExecutionInfoProvider; import io.javaoperatorsdk.operator.api.Context; import io.javaoperatorsdk.operator.api.Controller; -import io.javaoperatorsdk.operator.api.DeleteControl; import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.UpdateControl; import java.util.concurrent.atomic.AtomicInteger; @@ -25,12 +24,6 @@ public class RetryTestCustomResourceController LoggerFactory.getLogger(RetryTestCustomResourceController.class); private final AtomicInteger numberOfExecutions = new AtomicInteger(0); - @Override - public DeleteControl deleteResource( - RetryTestCustomResource resource, Context context) { - return DeleteControl.DEFAULT_DELETE; - } - @Override public UpdateControl createOrUpdateResource( RetryTestCustomResource resource, Context context) { @@ -44,7 +37,7 @@ public UpdateControl createOrUpdateResource( if (numberOfExecutions.get() < NUMBER_FAILED_EXECUTIONS + 1) { throw new RuntimeException("Testing Retry"); } - if (context.getRetryInfo().isEmpty() || context.getRetryInfo().get().isLastAttempt() == true) { + if (context.getRetryInfo().isEmpty() || context.getRetryInfo().get().isLastAttempt()) { throw new IllegalStateException("Not expected retry info: " + context.getRetryInfo()); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/subresource/SubResourceTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/subresource/SubResourceTestCustomResourceController.java index 0493f395b2..50f0285b22 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/subresource/SubResourceTestCustomResourceController.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/subresource/SubResourceTestCustomResourceController.java @@ -5,7 +5,6 @@ import io.javaoperatorsdk.operator.TestExecutionInfoProvider; import io.javaoperatorsdk.operator.api.Context; import io.javaoperatorsdk.operator.api.Controller; -import io.javaoperatorsdk.operator.api.DeleteControl; import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.UpdateControl; import java.util.concurrent.atomic.AtomicInteger; @@ -23,12 +22,6 @@ public class SubResourceTestCustomResourceController LoggerFactory.getLogger(SubResourceTestCustomResourceController.class); private final AtomicInteger numberOfExecutions = new AtomicInteger(0); - @Override - public DeleteControl deleteResource( - SubResourceTestCustomResource resource, Context context) { - return DeleteControl.DEFAULT_DELETE; - } - @Override public UpdateControl createOrUpdateResource( SubResourceTestCustomResource resource, Context context) {