From f7fe13f19de4b7ce26af2df0882d49cdf3c33731 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 6 May 2021 15:06:44 +0200 Subject: [PATCH 01/18] feat: simplify wiring of event sources --- .../io/javaoperatorsdk/operator/Operator.java | 68 ++----------------- .../processing/DefaultEventHandler.java | 36 ++++++---- .../event/DefaultEventSourceManager.java | 39 ++++++++++- .../internal/CustomResourceEventSource.java | 33 ++------- .../processing/DefaultEventHandlerTest.java | 15 ++-- .../CustomResourceEventSourceTest.java | 10 ++- 6 files changed, 84 insertions(+), 117 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 8d1956fe18..90aac65add 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -4,16 +4,10 @@ import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.Version; -import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; -import io.javaoperatorsdk.operator.processing.CustomResourceCache; -import io.javaoperatorsdk.operator.processing.DefaultEventHandler; -import io.javaoperatorsdk.operator.processing.EventDispatcher; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; -import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; -import io.javaoperatorsdk.operator.processing.retry.GenericRetry; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; @@ -115,18 +109,7 @@ public void register( configuration = existing; } - final var retry = GenericRetry.fromConfiguration(configuration.getRetryConfiguration()); - - // check if we only want to watch the current namespace - var targetNamespaces = configuration.getNamespaces().toArray(new String[] {}); - if (configuration.watchCurrentNamespace()) { - targetNamespaces = - new String[] {configurationService.getClientConfiguration().getNamespace()}; - } - Class resClass = configuration.getCustomResourceClass(); - String finalizer = configuration.getFinalizer(); - final String controllerName = configuration.getName(); // check that the custom resource is known by the cluster if configured that way @@ -148,61 +131,18 @@ public void register( } final var client = k8sClient.customResources(resClass); - EventDispatcher dispatcher = new EventDispatcher<>(controller, finalizer, client); - - CustomResourceCache customResourceCache = - new CustomResourceCache(configurationService.getObjectMapper()); - DefaultEventHandler defaultEventHandler = - new DefaultEventHandler( - customResourceCache, - dispatcher, - controllerName, - retry, - configurationService.concurrentReconciliationThreads()); DefaultEventSourceManager eventSourceManager = - new DefaultEventSourceManager(defaultEventHandler, retry != null); - defaultEventHandler.setEventSourceManager(eventSourceManager); - dispatcher.setEventSourceManager(eventSourceManager); - + new DefaultEventSourceManager(controller, configuration, client); controller.init(eventSourceManager); - final boolean watchAllNamespaces = configuration.watchAllNamespaces(); - CustomResourceEventSource customResourceEventSource = - createCustomResourceEventSource( - client, - customResourceCache, - watchAllNamespaces, - targetNamespaces, - configuration.isGenerationAware(), - finalizer, - resClass); - - closeables.add(customResourceEventSource); closeables.add(eventSourceManager); - customResourceEventSource.setEventHandler(defaultEventHandler); - customResourceEventSource.start(); - log.info( "Registered Controller: '{}' for CRD: '{}' for namespace(s): {}", controllerName, resClass, - watchAllNamespaces ? "[all namespaces]" : Arrays.toString(targetNamespaces)); + configuration.watchAllNamespaces() + ? "[all namespaces]" + : Arrays.toString(eventSourceManager.getTargetNamespaces())); } } - - private CustomResourceEventSource createCustomResourceEventSource( - MixedOperation client, - CustomResourceCache customResourceCache, - boolean watchAllNamespaces, - String[] targetNamespaces, - boolean generationAware, - String finalizer, - Class resClass) { - - return watchAllNamespaces - ? CustomResourceEventSource.customResourceEventSourceForAllNamespaces( - customResourceCache, client, generationAware, finalizer, resClass) - : CustomResourceEventSource.customResourceEventSourceForTargetNamespaces( - customResourceCache, client, targetNamespaces, generationAware, finalizer, resClass); - } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index 107a94e426..02e5246d9a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -5,11 +5,15 @@ import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.RetryInfo; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; +import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; +import io.javaoperatorsdk.operator.processing.retry.GenericRetry; import io.javaoperatorsdk.operator.processing.retry.Retry; import io.javaoperatorsdk.operator.processing.retry.RetryExecution; import java.util.HashMap; @@ -43,6 +47,16 @@ public class DefaultEventHandler implements EventHandler { private final ReentrantLock lock = new ReentrantLock(); public DefaultEventHandler( + ResourceController controller, ControllerConfiguration configuration, MixedOperation client) { + this( + new CustomResourceCache(configuration.getConfigurationService().getObjectMapper()), + new EventDispatcher(controller, configuration.getFinalizer(), client), + configuration.getName(), + GenericRetry.fromConfiguration(configuration.getRetryConfiguration()), + configuration.getConfigurationService().concurrentReconciliationThreads()); + } + + DefaultEventHandler( CustomResourceCache customResourceCache, EventDispatcher eventDispatcher, String relatedControllerName, @@ -59,19 +73,6 @@ public DefaultEventHandler( runnable -> new Thread(runnable, "EventHandler-" + relatedControllerName)); } - public DefaultEventHandler( - CustomResourceCache customResourceCache, - EventDispatcher eventDispatcher, - String relatedControllerName, - Retry retry) { - this( - customResourceCache, - eventDispatcher, - relatedControllerName, - retry, - ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER); - } - @Override public void close() { if (eventSourceManager != null) { @@ -84,10 +85,17 @@ public void close() { public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) { this.eventSourceManager = eventSourceManager; + eventDispatcher.setEventSourceManager(eventSourceManager); } @Override public void handleEvent(Event event) { + // cache the latest version of the CR + if (event instanceof CustomResourceEvent) { + CustomResourceEvent crEvent = (CustomResourceEvent) event; + customResourceCache.cacheResource(crEvent.getCustomResource()); + } + try { lock.lock(); log.debug("Received event: {}", event); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index 4a8a40517e..f9753a0dea 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -1,6 +1,13 @@ package io.javaoperatorsdk.operator.processing.event; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.Resource; +import io.javaoperatorsdk.operator.api.ResourceController; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.processing.DefaultEventHandler; +import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; import java.util.Collections; import java.util.Map; @@ -14,21 +21,51 @@ public class DefaultEventSourceManager implements EventSourceManager { public static final String RETRY_TIMER_EVENT_SOURCE_NAME = "retry-timer-event-source"; + private static final String CUSTOM_RESOURCE_EVENT_SOURCE_NAME = "custom-resource-event-source"; private static final Logger log = LoggerFactory.getLogger(DefaultEventSourceManager.class); private final ReentrantLock lock = new ReentrantLock(); private final Map eventSources = new ConcurrentHashMap<>(); private final DefaultEventHandler defaultEventHandler; + private String[] targetNamespaces; private TimerEventSource retryTimerEventSource; - public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolean supportRetry) { + DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolean supportRetry) { this.defaultEventHandler = defaultEventHandler; + defaultEventHandler.setEventSourceManager(this); if (supportRetry) { this.retryTimerEventSource = new TimerEventSource(); registerEventSource(RETRY_TIMER_EVENT_SOURCE_NAME, retryTimerEventSource); } } + public String[] getTargetNamespaces() { + return targetNamespaces; + } + + public DefaultEventSourceManager( + ResourceController controller, + ControllerConfiguration configuration, + MixedOperation, Resource> client) { + this(new DefaultEventHandler(controller, configuration, client), true); + // check if we only want to watch the current namespace + targetNamespaces = configuration.getNamespaces().toArray(new String[] {}); + if (configuration.watchCurrentNamespace()) { + targetNamespaces = + new String[] { + configuration.getConfigurationService().getClientConfiguration().getNamespace() + }; + } + registerEventSource( + CUSTOM_RESOURCE_EVENT_SOURCE_NAME, + new CustomResourceEventSource( + client, + targetNamespaces, + configuration.isGenerationAware(), + configuration.getFinalizer(), + configuration.getCustomResourceClass())); + } + @Override public void close() { try { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index ce0b190f64..0ec9e48ff6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -10,7 +10,6 @@ import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.dsl.internal.CustomResourceOperationsImpl; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; -import io.javaoperatorsdk.operator.processing.CustomResourceCache; import io.javaoperatorsdk.operator.processing.KubernetesResourceUtils; import io.javaoperatorsdk.operator.processing.event.AbstractEventSource; import java.util.ArrayList; @@ -27,7 +26,6 @@ public class CustomResourceEventSource extends AbstractEventSource private static final Logger log = LoggerFactory.getLogger(CustomResourceEventSource.class); - private final CustomResourceCache resourceCache; private final MixedOperation client; private final String[] targetNamespaces; private final boolean generationAware; @@ -36,35 +34,12 @@ public class CustomResourceEventSource extends AbstractEventSource private final List watches; private final String resClass; - public static CustomResourceEventSource customResourceEventSourceForAllNamespaces( - CustomResourceCache customResourceCache, - MixedOperation client, - boolean generationAware, - String resourceFinalizer, - Class resClass) { - return new CustomResourceEventSource( - customResourceCache, client, null, generationAware, resourceFinalizer, resClass); - } - - public static CustomResourceEventSource customResourceEventSourceForTargetNamespaces( - CustomResourceCache customResourceCache, - MixedOperation client, - String[] namespaces, - boolean generationAware, - String resourceFinalizer, - Class resClass) { - return new CustomResourceEventSource( - customResourceCache, client, namespaces, generationAware, resourceFinalizer, resClass); - } - - private CustomResourceEventSource( - CustomResourceCache customResourceCache, + public CustomResourceEventSource( MixedOperation client, String[] targetNamespaces, boolean generationAware, String resourceFinalizer, Class resClass) { - this.resourceCache = customResourceCache; this.client = client; this.targetNamespaces = targetNamespaces; this.generationAware = generationAware; @@ -73,6 +48,10 @@ private CustomResourceEventSource( this.resClass = resClass.getName(); } + public String[] getTargetNamespaces() { + return targetNamespaces; + } + @Override public void start() { CustomResourceOperationsImpl crClient = (CustomResourceOperationsImpl) client; @@ -113,8 +92,6 @@ public void eventReceived(Watcher.Action action, CustomResource customResource) action.name(), customResource.getMetadata().getName()); - resourceCache.cacheResource( - customResource); // always store the latest event. Outside the sync block is intentional. if (action == Action.ERROR) { log.debug( "Skipping {} event for custom resource uid: {}, version: {}", diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index b998518d53..fe38955e3b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -12,6 +12,7 @@ import static org.mockito.Mockito.when; import io.fabric8.kubernetes.client.Watcher; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; @@ -42,14 +43,20 @@ class DefaultEventHandlerTest { private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); private DefaultEventHandler defaultEventHandler = - new DefaultEventHandler(customResourceCache, eventDispatcherMock, "Test", null); + new DefaultEventHandler( + customResourceCache, + eventDispatcherMock, + "Test", + null, + ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER); private DefaultEventHandler defaultEventHandlerWithRetry = new DefaultEventHandler( customResourceCache, eventDispatcherMock, "Test", - GenericRetry.defaultLimitedExponentialRetry()); + GenericRetry.defaultLimitedExponentialRetry(), + ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER); @BeforeEach public void setup() { @@ -66,7 +73,7 @@ public void dispatchesEventsIfNoExecutionInProgress() { verify(eventDispatcherMock, timeout(50).times(1)).handleExecution(any()); } - @Test + /*@Test public void skipProcessingIfLatestCustomResourceNotInCache() { Event event = prepareCREvent(); customResourceCache.cleanup(event.getRelatedCustomResourceUid()); @@ -74,7 +81,7 @@ public void skipProcessingIfLatestCustomResourceNotInCache() { defaultEventHandler.handleEvent(event); verify(eventDispatcherMock, timeout(50).times(0)).handleExecution(any()); - } + }*/ @Test public void ifExecutionInProgressWaitsUntilItsFinished() throws InterruptedException { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java index 9bfd8e91e1..62e60a045f 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java @@ -8,7 +8,6 @@ import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.javaoperatorsdk.operator.TestUtils; -import io.javaoperatorsdk.operator.processing.CustomResourceCache; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import java.time.LocalDateTime; @@ -19,13 +18,12 @@ class CustomResourceEventSourceTest { public static final String FINALIZER = "finalizer"; - CustomResourceCache customResourceCache = new CustomResourceCache(); MixedOperation mixedOperation = mock(MixedOperation.class); EventHandler eventHandler = mock(EventHandler.class); private CustomResourceEventSource customResourceEventSource = - CustomResourceEventSource.customResourceEventSourceForAllNamespaces( - customResourceCache, mixedOperation, true, FINALIZER, TestCustomResource.class); + new CustomResourceEventSource( + mixedOperation, null, true, FINALIZER, TestCustomResource.class); @BeforeEach public void setup() { @@ -72,8 +70,8 @@ public void normalExecutionIfGenerationChanges() { @Test public void handlesAllEventIfNotGenerationAware() { customResourceEventSource = - CustomResourceEventSource.customResourceEventSourceForAllNamespaces( - customResourceCache, mixedOperation, false, FINALIZER, TestCustomResource.class); + new CustomResourceEventSource( + mixedOperation, null, false, FINALIZER, TestCustomResource.class); setup(); TestCustomResource customResource1 = TestUtils.testCustomResource(); From 467a2423fbbe116f901e760c45c86c4d3cbd06ca Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 6 May 2021 16:09:16 +0200 Subject: [PATCH 02/18] fix: set ConfigurationService when registering --- .../operator/api/config/AbstractConfigurationService.java | 1 + .../api/config/ControllerConfigurationOverrider.java | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java index de83fd9b36..0990907743 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java @@ -35,6 +35,7 @@ private void put( } } configurations.put(name, config); + config.setConfigurationService(this); } protected void throwExceptionOnNameCollision( diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java index be8c82af02..10743fba7f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java @@ -70,6 +70,11 @@ public ControllerConfiguration build() { public Class getCustomResourceClass() { return original.getCustomResourceClass(); } + + @Override + public ConfigurationService getConfigurationService() { + return original.getConfigurationService(); + } }; } From bdfde05ba514e39280c19145855d3310a89b08b2 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 6 May 2021 17:29:54 +0200 Subject: [PATCH 03/18] fix: add shutdown hook to close watchers on exit --- .../src/main/java/io/javaoperatorsdk/operator/Operator.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 90aac65add..b7df41315b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -28,6 +28,8 @@ public Operator(KubernetesClient k8sClient, ConfigurationService configurationSe this.k8sClient = k8sClient; this.configurationService = configurationService; this.closeables = new ArrayList<>(); + + Runtime.getRuntime().addShutdownHook(new Thread(this::close)); } /** From 529007fcb3fbf34bf703f1d7db457ebd41f7574f Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 6 May 2021 17:23:56 +0000 Subject: [PATCH 04/18] feat: introduce getEffectiveNamespaces on ControllerConfiguration --- .../io/javaoperatorsdk/operator/Operator.java | 3 +-- .../api/config/ControllerConfiguration.java | 9 ++++++++ .../event/DefaultEventSourceManager.java | 15 +----------- .../internal/CustomResourceEventSource.java | 23 ++++++++----------- 4 files changed, 21 insertions(+), 29 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index b7df41315b..9bcd72673d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -11,7 +11,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,7 +143,7 @@ public void register( resClass, configuration.watchAllNamespaces() ? "[all namespaces]" - : Arrays.toString(eventSourceManager.getTargetNamespaces())); + : configuration.getEffectiveNamespaces()); } } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java index a0fb7616f2..599ff46533 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java @@ -41,6 +41,15 @@ static boolean currentNamespaceWatched(Set namespaces) { && namespaces.contains(Controller.WATCH_CURRENT_NAMESPACE); } + default Set getEffectiveNamespaces() { + var targetNamespaces = getNamespaces(); + if (watchCurrentNamespace()) { + targetNamespaces = + Collections.singleton(getConfigurationService().getClientConfiguration().getNamespace()); + } + return targetNamespaces; + } + default RetryConfiguration getRetryConfiguration() { return RetryConfiguration.DEFAULT; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index f9753a0dea..fcb3907806 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -27,7 +27,6 @@ public class DefaultEventSourceManager implements EventSourceManager { private final ReentrantLock lock = new ReentrantLock(); private final Map eventSources = new ConcurrentHashMap<>(); private final DefaultEventHandler defaultEventHandler; - private String[] targetNamespaces; private TimerEventSource retryTimerEventSource; DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolean supportRetry) { @@ -39,28 +38,16 @@ public class DefaultEventSourceManager implements EventSourceManager { } } - public String[] getTargetNamespaces() { - return targetNamespaces; - } - public DefaultEventSourceManager( ResourceController controller, ControllerConfiguration configuration, MixedOperation, Resource> client) { this(new DefaultEventHandler(controller, configuration, client), true); - // check if we only want to watch the current namespace - targetNamespaces = configuration.getNamespaces().toArray(new String[] {}); - if (configuration.watchCurrentNamespace()) { - targetNamespaces = - new String[] { - configuration.getConfigurationService().getClientConfiguration().getNamespace() - }; - } registerEventSource( CUSTOM_RESOURCE_EVENT_SOURCE_NAME, new CustomResourceEventSource( client, - targetNamespaces, + configuration.getEffectiveNamespaces(), configuration.isGenerationAware(), configuration.getFinalizer(), configuration.getCustomResourceClass())); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index 0ec9e48ff6..3320952c34 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -27,7 +27,7 @@ public class CustomResourceEventSource extends AbstractEventSource private static final Logger log = LoggerFactory.getLogger(CustomResourceEventSource.class); private final MixedOperation client; - private final String[] targetNamespaces; + private final Set targetNamespaces; private final boolean generationAware; private final String resourceFinalizer; private final Map lastGenerationProcessedSuccessfully = new ConcurrentHashMap<>(); @@ -36,7 +36,7 @@ public class CustomResourceEventSource extends AbstractEventSource public CustomResourceEventSource( MixedOperation client, - String[] targetNamespaces, + Set targetNamespaces, boolean generationAware, String resourceFinalizer, Class resClass) { @@ -48,28 +48,25 @@ public CustomResourceEventSource( this.resClass = resClass.getName(); } - public String[] getTargetNamespaces() { - return targetNamespaces; - } - @Override public void start() { CustomResourceOperationsImpl crClient = (CustomResourceOperationsImpl) client; - if (ControllerConfiguration.allNamespacesWatched(Set.of(targetNamespaces))) { + if (ControllerConfiguration.allNamespacesWatched(targetNamespaces)) { var w = crClient.inAnyNamespace().watch(this); watches.add(w); log.debug("Registered controller {} -> {} for any namespace", resClass, w); - } else if (targetNamespaces.length == 0) { + } else if (targetNamespaces.isEmpty()) { var w = client.watch(this); watches.add(w); log.debug( "Registered controller {} -> {} for namespace {}", resClass, w, crClient.getNamespace()); } else { - for (String targetNamespace : targetNamespaces) { - var w = crClient.inNamespace(targetNamespace).watch(this); - watches.add(w); - log.debug("Registered controller {} -> {} for namespace: {}", resClass, w, targetNamespace); - } + targetNamespaces.forEach( + ns -> { + var w = crClient.inNamespace(ns).watch(this); + watches.add(w); + log.debug("Registered controller {} -> {} for namespace: {}", resClass, w, ns); + }); } } From e020a50b9ffa7f937a2aff663918d134c3bc58ee Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 7 May 2021 08:45:27 +0200 Subject: [PATCH 05/18] chore(tests): add more tests for DefaultEventSourceManager --- .../event/DefaultEventSourceManagerTest.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java index 39ae1c5517..42fa9fa95d 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManagerTest.java @@ -34,6 +34,20 @@ public void registersEventSource() { assertThat(registeredSources.entrySet()).hasSize(1); assertThat(registeredSources.get(CUSTOM_EVENT_SOURCE_NAME)).isEqualTo(eventSource); verify(eventSource, times(1)).setEventHandler(eq(defaultEventHandlerMock)); + verify(eventSource, times(1)).start(); + } + + @Test + public void closeShouldCascadeToEventSources() { + EventSource eventSource = mock(EventSource.class); + EventSource eventSource2 = mock(EventSource.class); + defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME, eventSource); + defaultEventSourceManager.registerEventSource(CUSTOM_EVENT_SOURCE_NAME + "2", eventSource2); + + defaultEventSourceManager.close(); + + verify(eventSource, times(1)).close(); + verify(eventSource2, times(1)).close(); } @Test From 94d368bf4e9537b461545353d156ccdddb25e5a6 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 7 May 2021 15:34:31 +0200 Subject: [PATCH 06/18] refactor: remove redundant branch in watch registration --- .../processing/event/internal/CustomResourceEventSource.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index 3320952c34..b448bd2942 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -55,11 +55,6 @@ public void start() { var w = crClient.inAnyNamespace().watch(this); watches.add(w); log.debug("Registered controller {} -> {} for any namespace", resClass, w); - } else if (targetNamespaces.isEmpty()) { - var w = client.watch(this); - watches.add(w); - log.debug( - "Registered controller {} -> {} for namespace {}", resClass, w, crClient.getNamespace()); } else { targetNamespaces.forEach( ns -> { From 575db5c60c2f8aaa3715ed4a81316736ec7a1666 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 7 May 2021 16:14:42 +0200 Subject: [PATCH 07/18] fix: properly close watches at the end of tests --- .../io/javaoperatorsdk/operator/IntegrationTestSupport.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java index e2cd129fb2..59cf083084 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java @@ -132,6 +132,8 @@ public void teardownIfSuccess(TestRun test) { try { test.run(); + operator.close(); + log.info("Deleting namespace {} and stopping operator", TEST_NAMESPACE); Namespace namespace = k8sClient.namespaces().withName(TEST_NAMESPACE).get(); if (namespace.getStatus().getPhase().equals("Active")) { From a1f1be8d96300c059c7d19a02e3ed43c570fbdf3 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 7 May 2021 16:40:37 +0200 Subject: [PATCH 08/18] refactor: remove EventSourceManager from Context --- .../io/javaoperatorsdk/operator/api/Context.java | 3 --- .../javaoperatorsdk/operator/api/DefaultContext.java | 11 +---------- .../operator/processing/DefaultEventHandler.java | 1 - .../operator/processing/EventDispatcher.java | 12 ++---------- 4 files changed, 3 insertions(+), 24 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/Context.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/Context.java index 7d04a5db93..10cba18985 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/Context.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/Context.java @@ -2,13 +2,10 @@ import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.processing.event.EventList; -import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import java.util.Optional; public interface Context { - EventSourceManager getEventSourceManager(); - EventList getEvents(); Optional getRetryInfo(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java index c922d674da..d650b8aa3b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/DefaultContext.java @@ -2,25 +2,16 @@ import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.processing.event.EventList; -import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import java.util.Optional; public class DefaultContext implements Context { private final RetryInfo retryInfo; private final EventList events; - private final EventSourceManager eventSourceManager; - public DefaultContext( - EventSourceManager eventSourceManager, EventList events, RetryInfo retryInfo) { + public DefaultContext(EventList events, RetryInfo retryInfo) { this.retryInfo = retryInfo; this.events = events; - this.eventSourceManager = eventSourceManager; - } - - @Override - public EventSourceManager getEventSourceManager() { - return eventSourceManager; } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index 02e5246d9a..f270b9fb2c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -85,7 +85,6 @@ public void close() { public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) { this.eventSourceManager = eventSourceManager; - eventDispatcher.setEventSourceManager(eventSourceManager); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java index 7da0b8d607..c7f5b20cce 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventDispatcher.java @@ -15,21 +15,19 @@ import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.UpdateControl; import io.javaoperatorsdk.operator.processing.event.EventList; -import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Dispatches events to the Controller and handles Finalizers for a single type of Custom Resource. */ -public class EventDispatcher { +class EventDispatcher { private static final Logger log = LoggerFactory.getLogger(EventDispatcher.class); private final ResourceController controller; private final String resourceFinalizer; private final CustomResourceFacade customResourceFacade; - private EventSourceManager eventSourceManager; EventDispatcher( ResourceController controller, @@ -47,10 +45,6 @@ public EventDispatcher( this(controller, finalizer, new CustomResourceFacade<>(client)); } - public void setEventSourceManager(EventSourceManager eventSourceManager) { - this.eventSourceManager = eventSourceManager; - } - public PostExecutionControl handleExecution(ExecutionScope executionScope) { try { return handleDispatch(executionScope); @@ -87,9 +81,7 @@ private PostExecutionControl handleDispatch(ExecutionScope executionScope) { } Context context = new DefaultContext<>( - eventSourceManager, - new EventList(executionScope.getEvents()), - executionScope.getRetryInfo()); + new EventList(executionScope.getEvents()), executionScope.getRetryInfo()); if (resource.isMarkedForDeletion()) { return handleDelete(resource, context); } else { From 1922f6625d2cce666e881323f2462a67cb623d4a Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 7 May 2021 17:12:16 +0200 Subject: [PATCH 09/18] refactor: remove default deleteResource implementations --- .../runtime/DefaultConfigurationServiceTest.java | 13 ------------- .../DoubleUpdateTestCustomResourceController.java | 6 ------ .../EventSourceTestCustomResourceController.java | 7 ------- .../retry/RetryTestCustomResourceController.java | 7 ------- .../SubResourceTestCustomResourceController.java | 7 ------- 5 files changed, 40 deletions(-) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/config/runtime/DefaultConfigurationServiceTest.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/config/runtime/DefaultConfigurationServiceTest.java index c7c8b2a0d9..f891208b14 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/config/runtime/DefaultConfigurationServiceTest.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/config/runtime/DefaultConfigurationServiceTest.java @@ -10,7 +10,6 @@ import io.javaoperatorsdk.operator.ControllerUtils; import io.javaoperatorsdk.operator.api.Context; import io.javaoperatorsdk.operator.api.Controller; -import io.javaoperatorsdk.operator.api.DeleteControl; import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.UpdateControl; import org.junit.jupiter.api.Test; @@ -59,13 +58,6 @@ static class TestCustomFinalizerController @Version("v1") public class InnerCustomResource extends CustomResource {} - @Override - public DeleteControl deleteResource( - TestCustomFinalizerController.InnerCustomResource resource, - Context context) { - return DeleteControl.DEFAULT_DELETE; - } - @Override public UpdateControl createOrUpdateResource( InnerCustomResource resource, Context context) { @@ -75,11 +67,6 @@ public UpdateControl createOr @Controller(generationAwareEventProcessing = false, name = "test") static class TestCustomResourceController implements ResourceController { - @Override - public DeleteControl deleteResource( - TestCustomResource resource, Context context) { - return DeleteControl.DEFAULT_DELETE; - } @Override public UpdateControl createOrUpdateResource( diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/doubleupdate/DoubleUpdateTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/doubleupdate/DoubleUpdateTestCustomResourceController.java index 613a1eba79..e3959681d9 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/doubleupdate/DoubleUpdateTestCustomResourceController.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/doubleupdate/DoubleUpdateTestCustomResourceController.java @@ -17,12 +17,6 @@ public class DoubleUpdateTestCustomResourceController public static final String TEST_ANNOTATION_VALUE = "TestAnnotationValue"; private final AtomicInteger numberOfExecutions = new AtomicInteger(0); - @Override - public DeleteControl deleteResource( - DoubleUpdateTestCustomResource resource, Context context) { - return DeleteControl.DEFAULT_DELETE; - } - @Override public UpdateControl createOrUpdateResource( DoubleUpdateTestCustomResource resource, Context context) { diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java index e77c7bdd14..7f42429b5f 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/event/EventSourceTestCustomResourceController.java @@ -5,7 +5,6 @@ import io.javaoperatorsdk.operator.TestExecutionInfoProvider; import io.javaoperatorsdk.operator.api.Context; import io.javaoperatorsdk.operator.api.Controller; -import io.javaoperatorsdk.operator.api.DeleteControl; import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.UpdateControl; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; @@ -33,12 +32,6 @@ public void init(EventSourceManager eventSourceManager) { eventSourceManager.registerEventSource("Timer", timerEventSource); } - @Override - public DeleteControl deleteResource( - EventSourceTestCustomResource resource, Context context) { - return DeleteControl.DEFAULT_DELETE; - } - @Override public UpdateControl createOrUpdateResource( EventSourceTestCustomResource resource, Context context) { diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java index 0bc1e227cd..671e90a703 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java @@ -5,7 +5,6 @@ import io.javaoperatorsdk.operator.TestExecutionInfoProvider; import io.javaoperatorsdk.operator.api.Context; import io.javaoperatorsdk.operator.api.Controller; -import io.javaoperatorsdk.operator.api.DeleteControl; import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.UpdateControl; import java.util.concurrent.atomic.AtomicInteger; @@ -25,12 +24,6 @@ public class RetryTestCustomResourceController LoggerFactory.getLogger(RetryTestCustomResourceController.class); private final AtomicInteger numberOfExecutions = new AtomicInteger(0); - @Override - public DeleteControl deleteResource( - RetryTestCustomResource resource, Context context) { - return DeleteControl.DEFAULT_DELETE; - } - @Override public UpdateControl createOrUpdateResource( RetryTestCustomResource resource, Context context) { diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/subresource/SubResourceTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/subresource/SubResourceTestCustomResourceController.java index 0493f395b2..50f0285b22 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/subresource/SubResourceTestCustomResourceController.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/subresource/SubResourceTestCustomResourceController.java @@ -5,7 +5,6 @@ import io.javaoperatorsdk.operator.TestExecutionInfoProvider; import io.javaoperatorsdk.operator.api.Context; import io.javaoperatorsdk.operator.api.Controller; -import io.javaoperatorsdk.operator.api.DeleteControl; import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.UpdateControl; import java.util.concurrent.atomic.AtomicInteger; @@ -23,12 +22,6 @@ public class SubResourceTestCustomResourceController LoggerFactory.getLogger(SubResourceTestCustomResourceController.class); private final AtomicInteger numberOfExecutions = new AtomicInteger(0); - @Override - public DeleteControl deleteResource( - SubResourceTestCustomResource resource, Context context) { - return DeleteControl.DEFAULT_DELETE; - } - @Override public UpdateControl createOrUpdateResource( SubResourceTestCustomResource resource, Context context) { From a9ed443e45f6d6f5324549278644aa3c9a986dbf Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 7 May 2021 17:15:08 +0200 Subject: [PATCH 10/18] refactor: simplify condition --- .../sample/retry/RetryTestCustomResourceController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java index 671e90a703..512a0a55a4 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/retry/RetryTestCustomResourceController.java @@ -37,7 +37,7 @@ public UpdateControl createOrUpdateResource( if (numberOfExecutions.get() < NUMBER_FAILED_EXECUTIONS + 1) { throw new RuntimeException("Testing Retry"); } - if (context.getRetryInfo().isEmpty() || context.getRetryInfo().get().isLastAttempt() == true) { + if (context.getRetryInfo().isEmpty() || context.getRetryInfo().get().isLastAttempt()) { throw new IllegalStateException("Not expected retry info: " + context.getRetryInfo()); } From 69df932f5588275a8b9baf18bab95e145eebe503 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 7 May 2021 18:13:34 +0200 Subject: [PATCH 11/18] refactor: improve method names --- .../event/internal/CustomResourceEventSource.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index b448bd2942..76fec43c22 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -93,7 +93,7 @@ public void eventReceived(Watcher.Action action, CustomResource customResource) return; } - if (!skipBecauseOfGenerations(customResource)) { + if (!skipBecauseOfGeneration(customResource)) { eventHandler.handleEvent(new CustomResourceEvent(action, customResource, this)); markLastGenerationProcessed(customResource); } else { @@ -111,7 +111,7 @@ private void markLastGenerationProcessed(CustomResource resource) { } } - private boolean skipBecauseOfGenerations(CustomResource customResource) { + private boolean skipBecauseOfGeneration(CustomResource customResource) { if (!generationAware) { return false; } @@ -119,13 +119,10 @@ private boolean skipBecauseOfGenerations(CustomResource customResource) { if (customResource.isMarkedForDeletion()) { return false; } - if (!largerGenerationThenProcessedBefore(customResource)) { - return true; - } - return false; + return !hasGenerationAlreadyBeenProcessed(customResource); } - public boolean largerGenerationThenProcessedBefore(CustomResource resource) { + private boolean hasGenerationAlreadyBeenProcessed(CustomResource resource) { Long lastGeneration = lastGenerationProcessedSuccessfully.get(resource.getMetadata().getUid()); if (lastGeneration == null) { return true; From 141ae35ceef1dcc359a8c10c2395ea18a829afa3 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 7 May 2021 18:59:15 +0200 Subject: [PATCH 12/18] refactor: parameterize CustomResourceEventSource by the CustomResource --- .../event/DefaultEventSourceManager.java | 10 +---- .../internal/CustomResourceEventSource.java | 38 ++++++++++++------- .../CustomResourceEventSourceTest.java | 14 +++---- 3 files changed, 34 insertions(+), 28 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index fcb3907806..cdfb84778f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -38,19 +38,13 @@ public class DefaultEventSourceManager implements EventSourceManager { } } - public DefaultEventSourceManager( + public > DefaultEventSourceManager( ResourceController controller, ControllerConfiguration configuration, MixedOperation, Resource> client) { this(new DefaultEventHandler(controller, configuration, client), true); registerEventSource( - CUSTOM_RESOURCE_EVENT_SOURCE_NAME, - new CustomResourceEventSource( - client, - configuration.getEffectiveNamespaces(), - configuration.isGenerationAware(), - configuration.getFinalizer(), - configuration.getCustomResourceClass())); + CUSTOM_RESOURCE_EVENT_SOURCE_NAME, new CustomResourceEventSource<>(client, configuration)); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index 76fec43c22..9c1a2b91e0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -3,11 +3,13 @@ import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.dsl.internal.CustomResourceOperationsImpl; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.processing.KubernetesResourceUtils; @@ -21,12 +23,12 @@ import org.slf4j.LoggerFactory; /** This is a special case since is not bound to a single custom resource */ -public class CustomResourceEventSource extends AbstractEventSource - implements Watcher { +public class CustomResourceEventSource> extends AbstractEventSource + implements Watcher { private static final Logger log = LoggerFactory.getLogger(CustomResourceEventSource.class); - private final MixedOperation client; + private final CustomResourceOperationsImpl> client; private final Set targetNamespaces; private final boolean generationAware; private final String resourceFinalizer; @@ -35,12 +37,23 @@ public class CustomResourceEventSource extends AbstractEventSource private final String resClass; public CustomResourceEventSource( - MixedOperation client, + MixedOperation, Resource> client, + ControllerConfiguration configuration) { + this( + client, + configuration.getEffectiveNamespaces(), + configuration.isGenerationAware(), + configuration.getFinalizer(), + configuration.getCustomResourceClass()); + } + + CustomResourceEventSource( + MixedOperation, Resource> client, Set targetNamespaces, boolean generationAware, String resourceFinalizer, - Class resClass) { - this.client = client; + Class resClass) { + this.client = (CustomResourceOperationsImpl>) client; this.targetNamespaces = targetNamespaces; this.generationAware = generationAware; this.resourceFinalizer = resourceFinalizer; @@ -50,15 +63,14 @@ public CustomResourceEventSource( @Override public void start() { - CustomResourceOperationsImpl crClient = (CustomResourceOperationsImpl) client; if (ControllerConfiguration.allNamespacesWatched(targetNamespaces)) { - var w = crClient.inAnyNamespace().watch(this); + var w = client.inAnyNamespace().watch(this); watches.add(w); log.debug("Registered controller {} -> {} for any namespace", resClass, w); } else { targetNamespaces.forEach( ns -> { - var w = crClient.inNamespace(ns).watch(this); + var w = client.inNamespace(ns).watch(this); watches.add(w); log.debug("Registered controller {} -> {} for namespace: {}", resClass, w, ns); }); @@ -78,7 +90,7 @@ public void close() { } @Override - public void eventReceived(Watcher.Action action, CustomResource customResource) { + public void eventReceived(Watcher.Action action, T customResource) { log.debug( "Event received for action: {}, resource: {}", action.name(), @@ -104,14 +116,14 @@ public void eventReceived(Watcher.Action action, CustomResource customResource) } } - private void markLastGenerationProcessed(CustomResource resource) { + private void markLastGenerationProcessed(T resource) { if (generationAware && resource.hasFinalizer(resourceFinalizer)) { lastGenerationProcessedSuccessfully.put( KubernetesResourceUtils.getUID(resource), resource.getMetadata().getGeneration()); } } - private boolean skipBecauseOfGeneration(CustomResource customResource) { + private boolean skipBecauseOfGeneration(T customResource) { if (!generationAware) { return false; } @@ -122,7 +134,7 @@ private boolean skipBecauseOfGeneration(CustomResource customResource) { return !hasGenerationAlreadyBeenProcessed(customResource); } - private boolean hasGenerationAlreadyBeenProcessed(CustomResource resource) { + private boolean hasGenerationAlreadyBeenProcessed(T resource) { Long lastGeneration = lastGenerationProcessedSuccessfully.get(resource.getMetadata().getUid()); if (lastGeneration == null) { return true; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java index 62e60a045f..76c289b094 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java @@ -5,8 +5,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.internal.CustomResourceOperationsImpl; import io.javaoperatorsdk.operator.TestUtils; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; @@ -18,12 +19,12 @@ class CustomResourceEventSourceTest { public static final String FINALIZER = "finalizer"; - MixedOperation mixedOperation = mock(MixedOperation.class); + CustomResourceOperationsImpl> + client = mock(CustomResourceOperationsImpl.class); EventHandler eventHandler = mock(EventHandler.class); - private CustomResourceEventSource customResourceEventSource = - new CustomResourceEventSource( - mixedOperation, null, true, FINALIZER, TestCustomResource.class); + private CustomResourceEventSource customResourceEventSource = + new CustomResourceEventSource<>(client, null, true, FINALIZER, TestCustomResource.class); @BeforeEach public void setup() { @@ -70,8 +71,7 @@ public void normalExecutionIfGenerationChanges() { @Test public void handlesAllEventIfNotGenerationAware() { customResourceEventSource = - new CustomResourceEventSource( - mixedOperation, null, false, FINALIZER, TestCustomResource.class); + new CustomResourceEventSource<>(client, null, false, FINALIZER, TestCustomResource.class); setup(); TestCustomResource customResource1 = TestUtils.testCustomResource(); From 0ff83f93de61c21919cbb60e439d429fa7d8fe13 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 7 May 2021 19:15:59 +0200 Subject: [PATCH 13/18] chore: remove unused class --- .../processing/event/ExecutionDescriptor.java | 34 ------------------- 1 file changed, 34 deletions(-) delete mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionDescriptor.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionDescriptor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionDescriptor.java deleted file mode 100644 index 676b355412..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionDescriptor.java +++ /dev/null @@ -1,34 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event; - -import io.javaoperatorsdk.operator.processing.ExecutionScope; -import io.javaoperatorsdk.operator.processing.PostExecutionControl; -import java.time.LocalDateTime; - -public class ExecutionDescriptor { - - private final ExecutionScope executionScope; - private final PostExecutionControl postExecutionControl; - private final LocalDateTime executionFinishedAt; - - public ExecutionDescriptor( - ExecutionScope executionScope, - PostExecutionControl postExecutionControl, - LocalDateTime executionFinishedAt) { - this.executionScope = executionScope; - this.postExecutionControl = postExecutionControl; - - this.executionFinishedAt = executionFinishedAt; - } - - public ExecutionScope getExecutionScope() { - return executionScope; - } - - public PostExecutionControl getPostExecutionControl() { - return postExecutionControl; - } - - public String getCustomResourceUid() { - return executionScope.getCustomResourceUid(); - } -} From 8c07bb5198078237f4ff80fe1f37b708d770a00d Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 7 May 2021 22:47:51 +0200 Subject: [PATCH 14/18] fix: also close handler and wait a little for tasks to finish --- .../operator/processing/DefaultEventHandler.java | 11 ++++++----- .../event/internal/CustomResourceEventSource.java | 3 ++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index f270b9fb2c..b6d94c4928 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -22,6 +22,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,12 +76,12 @@ public DefaultEventHandler( @Override public void close() { - if (eventSourceManager != null) { - log.debug("Closing EventSourceManager {} -> {}", controllerName, eventSourceManager); - eventSourceManager.close(); + try { + log.debug("Closing handler for {}", controllerName); + executor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.debug("Exception closing handler for {}: {}", controllerName, e.getLocalizedMessage()); } - - executor.shutdownNow(); } public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index 9c1a2b91e0..0b1022751c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -79,9 +79,10 @@ public void start() { @Override public void close() { + eventHandler.close(); for (Watch watch : this.watches) { try { - log.debug("Closing watch {} -> {}", resClass, watch); + log.info("Closing watch {} -> {}", resClass, watch); watch.close(); } catch (Exception e) { log.warn("Error closing watcher {} -> {}", resClass, watch, e); From fe01c769ea1671dc0cec88bff8a45f366800a14f Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 7 May 2021 22:49:31 +0200 Subject: [PATCH 15/18] fix: revert closing operator when tests are finished MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The problem is that if we close the operator then the delete events never get handled and the finalizers are not removed leading to the namespace never being deleted, thus failing the test… --- .../io/javaoperatorsdk/operator/IntegrationTestSupport.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java index 59cf083084..e2cd129fb2 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/IntegrationTestSupport.java @@ -132,8 +132,6 @@ public void teardownIfSuccess(TestRun test) { try { test.run(); - operator.close(); - log.info("Deleting namespace {} and stopping operator", TEST_NAMESPACE); Namespace namespace = k8sClient.namespaces().withName(TEST_NAMESPACE).get(); if (namespace.getStatus().getPhase().equals("Active")) { From fce710b8c64de522820a846d71414d36da76b918 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 11 May 2021 10:28:02 +0200 Subject: [PATCH 16/18] fix: restore previous caching behavior --- .../processing/DefaultEventHandler.java | 7 ------ .../internal/CustomResourceEventSource.java | 25 ++++++++++++++++++- .../processing/DefaultEventHandlerTest.java | 4 +-- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index b6d94c4928..a67d6d5fd6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -12,7 +12,6 @@ import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; -import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; import io.javaoperatorsdk.operator.processing.retry.Retry; import io.javaoperatorsdk.operator.processing.retry.RetryExecution; @@ -90,12 +89,6 @@ public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) @Override public void handleEvent(Event event) { - // cache the latest version of the CR - if (event instanceof CustomResourceEvent) { - CustomResourceEvent crEvent = (CustomResourceEvent) event; - customResourceCache.cacheResource(crEvent.getCustomResource()); - } - try { lock.lock(); log.debug("Received event: {}", event); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index 0b1022751c..0b60b7f730 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -12,6 +12,7 @@ import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.dsl.internal.CustomResourceOperationsImpl; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.processing.CustomResourceCache; import io.javaoperatorsdk.operator.processing.KubernetesResourceUtils; import io.javaoperatorsdk.operator.processing.event.AbstractEventSource; import java.util.ArrayList; @@ -35,6 +36,7 @@ public class CustomResourceEventSource> extends A private final Map lastGenerationProcessedSuccessfully = new ConcurrentHashMap<>(); private final List watches; private final String resClass; + private final CustomResourceCache customResourceCache; public CustomResourceEventSource( MixedOperation, Resource> client, @@ -44,7 +46,8 @@ public CustomResourceEventSource( configuration.getEffectiveNamespaces(), configuration.isGenerationAware(), configuration.getFinalizer(), - configuration.getCustomResourceClass()); + configuration.getCustomResourceClass(), + new CustomResourceCache(configuration.getConfigurationService().getObjectMapper())); } CustomResourceEventSource( @@ -53,12 +56,29 @@ public CustomResourceEventSource( boolean generationAware, String resourceFinalizer, Class resClass) { + this( + client, + targetNamespaces, + generationAware, + resourceFinalizer, + resClass, + new CustomResourceCache()); + } + + CustomResourceEventSource( + MixedOperation, Resource> client, + Set targetNamespaces, + boolean generationAware, + String resourceFinalizer, + Class resClass, + CustomResourceCache customResourceCache) { this.client = (CustomResourceOperationsImpl>) client; this.targetNamespaces = targetNamespaces; this.generationAware = generationAware; this.resourceFinalizer = resourceFinalizer; this.watches = new ArrayList<>(); this.resClass = resClass.getName(); + this.customResourceCache = customResourceCache; } @Override @@ -97,6 +117,9 @@ public void eventReceived(Watcher.Action action, T customResource) { action.name(), customResource.getMetadata().getName()); + // cache the latest version of the CR + customResourceCache.cacheResource(customResource); + if (action == Action.ERROR) { log.debug( "Skipping {} event for custom resource uid: {}, version: {}", diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index fe38955e3b..b2076723ef 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -73,7 +73,7 @@ public void dispatchesEventsIfNoExecutionInProgress() { verify(eventDispatcherMock, timeout(50).times(1)).handleExecution(any()); } - /*@Test + @Test public void skipProcessingIfLatestCustomResourceNotInCache() { Event event = prepareCREvent(); customResourceCache.cleanup(event.getRelatedCustomResourceUid()); @@ -81,7 +81,7 @@ public void skipProcessingIfLatestCustomResourceNotInCache() { defaultEventHandler.handleEvent(event); verify(eventDispatcherMock, timeout(50).times(0)).handleExecution(any()); - }*/ + } @Test public void ifExecutionInProgressWaitsUntilItsFinished() throws InterruptedException { From 145dcebacdd2bde9987b201354efabe39ef60186 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 11 May 2021 15:23:34 +0200 Subject: [PATCH 17/18] fix: DefaultEventHandler should get its cache from the CR event source --- .../processing/DefaultEventHandler.java | 9 ++------ .../event/DefaultEventSourceManager.java | 21 +++++++++++++++++++ .../internal/CustomResourceEventSource.java | 5 +++++ .../processing/DefaultEventHandlerTest.java | 18 ++++++++++++++-- 4 files changed, 44 insertions(+), 9 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index a67d6d5fd6..a082e770b0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -34,7 +34,6 @@ public class DefaultEventHandler implements EventHandler { private static final Logger log = LoggerFactory.getLogger(DefaultEventHandler.class); - private final CustomResourceCache customResourceCache; private final EventBuffer eventBuffer; private final Set underProcessing = new HashSet<>(); private final ScheduledThreadPoolExecutor executor; @@ -49,7 +48,6 @@ public class DefaultEventHandler implements EventHandler { public DefaultEventHandler( ResourceController controller, ControllerConfiguration configuration, MixedOperation client) { this( - new CustomResourceCache(configuration.getConfigurationService().getObjectMapper()), new EventDispatcher(controller, configuration.getFinalizer(), client), configuration.getName(), GenericRetry.fromConfiguration(configuration.getRetryConfiguration()), @@ -57,12 +55,10 @@ public DefaultEventHandler( } DefaultEventHandler( - CustomResourceCache customResourceCache, EventDispatcher eventDispatcher, String relatedControllerName, Retry retry, int concurrentReconciliationThreads) { - this.customResourceCache = customResourceCache; this.eventDispatcher = eventDispatcher; this.retry = retry; this.controllerName = relatedControllerName; @@ -103,7 +99,7 @@ private void executeBufferedEvents(String customResourceUid) { boolean newEventForResourceId = eventBuffer.containsEvents(customResourceUid); boolean controllerUnderExecution = isControllerUnderExecution(customResourceUid); Optional latestCustomResource = - customResourceCache.getLatestResource(customResourceUid); + eventSourceManager.getLatestResource(customResourceUid); if (!controllerUnderExecution && newEventForResourceId && latestCustomResource.isPresent()) { setUnderExecutionProcessing(customResourceUid); @@ -235,7 +231,7 @@ private void cacheUpdatedResourceIfChanged( getUID(originalCustomResource), getVersion(customResourceAfterExecution), getVersion(originalCustomResource)); - this.customResourceCache.cacheResource( + eventSourceManager.cacheResource( customResourceAfterExecution, customResource -> getVersion(customResource).equals(originalResourceVersion) @@ -246,7 +242,6 @@ private void cacheUpdatedResourceIfChanged( private void cleanupAfterDeletedEvent(String customResourceUid) { eventSourceManager.cleanup(customResourceUid); eventBuffer.cleanup(customResourceUid); - customResourceCache.cleanup(customResourceUid); } private boolean isControllerUnderExecution(String customResourceUid) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index cdfb84778f..a3ea01461b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -6,6 +6,7 @@ import io.fabric8.kubernetes.client.dsl.Resource; import io.javaoperatorsdk.operator.api.ResourceController; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.processing.CustomResourceCache; import io.javaoperatorsdk.operator.processing.DefaultEventHandler; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; @@ -15,6 +16,7 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,5 +136,24 @@ public void cleanup(String customResourceUid) { .keySet() .forEach(k -> deRegisterCustomResourceFromEventSource(k, customResourceUid)); eventSources.remove(customResourceUid); + getCache().cleanup(customResourceUid); + } + + // todo: remove + public CustomResourceCache getCache() { + final var source = + (CustomResourceEventSource) + getRegisteredEventSources().get(CUSTOM_RESOURCE_EVENT_SOURCE_NAME); + return source.getCache(); + } + + // todo: remove + public Optional getLatestResource(String customResourceUid) { + return getCache().getLatestResource(customResourceUid); + } + + // todo: remove + public void cacheResource(CustomResource resource, Predicate predicate) { + getCache().cacheResource(resource, predicate); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index 0b60b7f730..a547878489 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -193,4 +193,9 @@ public void onClose(WatcherException e) { System.exit(1); } } + + // todo: remove + public CustomResourceCache getCache() { + return customResourceCache; + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index b2076723ef..947ac74be8 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -4,6 +4,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; @@ -40,11 +42,11 @@ class DefaultEventHandlerTest { private CustomResourceCache customResourceCache = new CustomResourceCache(); private DefaultEventSourceManager defaultEventSourceManagerMock = mock(DefaultEventSourceManager.class); + private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); private DefaultEventHandler defaultEventHandler = new DefaultEventHandler( - customResourceCache, eventDispatcherMock, "Test", null, @@ -52,7 +54,6 @@ class DefaultEventHandlerTest { private DefaultEventHandler defaultEventHandlerWithRetry = new DefaultEventHandler( - customResourceCache, eventDispatcherMock, "Test", GenericRetry.defaultLimitedExponentialRetry(), @@ -64,6 +65,19 @@ public void setup() { .thenReturn(retryTimerEventSourceMock); defaultEventHandler.setEventSourceManager(defaultEventSourceManagerMock); defaultEventHandlerWithRetry.setEventSourceManager(defaultEventSourceManagerMock); + + // todo: remove + when(defaultEventSourceManagerMock.getCache()).thenReturn(customResourceCache); + doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResource(any()); + doCallRealMethod().when(defaultEventSourceManagerMock).cacheResource(any(), any()); + doAnswer( + invocation -> { + final var resourceId = (String) invocation.getArgument(0); + customResourceCache.cleanup(resourceId); + return null; + }) + .when(defaultEventSourceManagerMock) + .cleanup(any()); } @Test From b3e316bcc8003ff8158858363f0d18e470cc99e3 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Wed, 12 May 2021 21:16:30 +0200 Subject: [PATCH 18/18] refactor: inline method --- .../event/internal/CustomResourceEventSource.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index a547878489..871c52a8a2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -155,15 +155,14 @@ private boolean skipBecauseOfGeneration(T customResource) { if (customResource.isMarkedForDeletion()) { return false; } - return !hasGenerationAlreadyBeenProcessed(customResource); - } - private boolean hasGenerationAlreadyBeenProcessed(T resource) { - Long lastGeneration = lastGenerationProcessedSuccessfully.get(resource.getMetadata().getUid()); + // only proceed if we haven't already seen this custom resource generation + Long lastGeneration = + lastGenerationProcessedSuccessfully.get(customResource.getMetadata().getUid()); if (lastGeneration == null) { - return true; + return false; } else { - return resource.getMetadata().getGeneration() > lastGeneration; + return customResource.getMetadata().getGeneration() <= lastGeneration; } }