From c32ad765b94e62844af40c3d5a40fae17f423245 Mon Sep 17 00:00:00 2001 From: shawkins Date: Thu, 29 Apr 2021 10:28:48 -0400 Subject: [PATCH] removing the deltafifo to simplify the informer code it's not in these changes, but it will allow the cache interface to become fully type safe --- .../client/informers/cache/Cache.java | 4 - .../client/informers/cache/Controller.java | 49 +- .../client/informers/cache/DeltaFIFO.java | 531 ------------------ .../informers/cache/ProcessorListener.java | 17 +- .../informers/cache/ProcessorStore.java | 120 ++++ .../client/informers/cache/Store.java | 7 + .../impl/DefaultSharedIndexInformer.java | 55 +- .../informers/cache/ControllerTest.java | 42 +- .../client/informers/cache/DeltaFIFOTest.java | 162 ------ .../informers/cache/ProcessorStoreTest.java | 111 ++++ .../DefaultSharedIndexInformerIT.java | 4 +- 11 files changed, 277 insertions(+), 825 deletions(-) delete mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/DeltaFIFO.java create mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java delete mode 100644 kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/DeltaFIFOTest.java create mode 100644 kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java index 67baecdd0a6..ef6240c8279 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java @@ -372,10 +372,6 @@ public void addIndexFunc(String indexName, Function> indexFunc) * @return the key */ public static String deletionHandlingMetaNamespaceKeyFunc(T object) { - if (object instanceof DeltaFIFO.DeletedFinalStateUnknown) { - DeltaFIFO.DeletedFinalStateUnknown deleteObj = (DeltaFIFO.DeletedFinalStateUnknown) object; - return deleteObj.getKey(); - } return metaNamespaceKeyFunc(object); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java index c1c3eba24df..893be488001 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java @@ -24,14 +24,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.AbstractMap; -import java.util.Deque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -48,10 +45,7 @@ public class Controller queue; + private final Store store; private final ListerWatcher listerWatcher; @@ -59,8 +53,6 @@ public class Controller resyncFunc; - private final Consumer>> processFunc; - private final ScheduledExecutorService resyncExecutor; private ScheduledFuture resyncFuture; @@ -71,13 +63,10 @@ public class Controller apiTypeClass; - private volatile boolean running; - - public Controller(Class apiTypeClass, DeltaFIFO queue, ListerWatcher listerWatcher, Consumer>> processFunc, Supplier resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue eventListeners) { - this.queue = queue; + public Controller(Class apiTypeClass, Store store, ListerWatcher listerWatcher, Supplier resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue eventListeners) { + this.store = store; this.listerWatcher = listerWatcher; this.apiTypeClass = apiTypeClass; - this.processFunc = processFunc; this.resyncFunc = resyncFunc; if (fullResyncPeriod < 0) { throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value"); @@ -88,14 +77,14 @@ public Controller(Class apiTypeClass, DeltaFIFO queue, ListerWatcher // Starts one daemon thread for resync this.resyncExecutor = Executors.newSingleThreadScheduledExecutor(); - this.reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext); + this.reflector = new Reflector<>(apiTypeClass, listerWatcher, store, operationContext); } public void run() { log.info("informer#Controller: ready to run resync and reflector runnable"); // Start the resync runnable if (fullResyncPeriod > 0) { - ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc); + ResyncRunnable resyncRunnable = new ResyncRunnable(store, resyncFunc); if(!resyncExecutor.isShutdown()) { resyncFuture = resyncExecutor.scheduleAtFixedRate(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS); } @@ -104,16 +93,10 @@ public void run() { } try { - running = true; reflector.listAndWatch(); - - // Start the process loop - this.processLoop(); } catch (Exception exception) { log.warn("Reflector list-watching job exiting because the thread-pool is shutting down", exception); this.eventListeners.forEach(listener -> listener.onException(exception)); - } finally { - running = false; } } @@ -135,7 +118,7 @@ public void stop() { * @return boolean value about queue sync status */ public boolean hasSynced() { - return this.queue.hasSynced(); + return this.store.hasSynced(); } /** @@ -150,30 +133,12 @@ Reflector getReflector() { return reflector; } - /** - * drains the work queue. - */ - private void processLoop() throws Exception { - while (!Thread.currentThread().isInterrupted()) { - try { - this.queue.pop(this.processFunc); - } catch (InterruptedException t) { - log.debug("DefaultController#processLoop got interrupted: {}", t.getMessage()); - Thread.currentThread().interrupt(); - return; - } catch (Exception e) { - log.error("DefaultController#processLoop recovered from crashing {} ", e.getMessage(), e); - throw e; - } - } - } - ScheduledExecutorService getResyncExecutor() { return this.resyncExecutor; } public boolean isRunning() { - return running && this.reflector.isRunning(); + return this.reflector.isRunning(); } public long getFullResyncPeriod() { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/DeltaFIFO.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/DeltaFIFO.java deleted file mode 100644 index 4d290eb34d2..00000000000 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/DeltaFIFO.java +++ /dev/null @@ -1,531 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.informers.cache; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Consumer; -import java.util.function.Function; - -/** - * The DeltaFIFO inherits from an Object queue since that we're actually - * reading from it as Deltas but writing it as Kubernetes object. - * - * This is taken from official client: https://github.com/kubernetes-client/java/blob/master/util/src/main/java/io/kubernetes/client/informer/cache/DeltaFIFO.java - * which is ported from official go client: https://github.com/kubernetes/client-go/blob/master/tools/cache/delta_fifo.go - */ -public class DeltaFIFO implements Store { - private static final Logger log = LoggerFactory.getLogger(DeltaFIFO.class); - - private Function keyFunc; - - /** - * Mapping deltas with key by calling keyFunc - */ - private Map>> items; - - /** - * an underlying queue storing incoming items' keys - */ - private Deque queue; - - private Store knownObjects; - - /** - * Populated is true if the first batch of items inserted by Replace() has been - * populated or Delete/Add/Update was called first. - */ - private boolean populated = false; - - /** - * It's the number of items inserted by the first call of Replace() - */ - private int initialPopulationCount; - - /** - * For thread safety - */ - private ReadWriteLock lock = new ReentrantReadWriteLock(); - - /** - * indicates if the store is empty - */ - private Condition notEmpty; - - public DeltaFIFO(Function keyFunc, Store knownObjects) { - this.keyFunc = keyFunc; - this.knownObjects = knownObjects; - this.items = new HashMap<>(); - this.queue = new LinkedList<>(); - this.notEmpty = lock.writeLock().newCondition(); - } - - /** - * Add items to the delta FIFO. - * - * @param obj object - */ - @Override - public void add(Object obj) { - lock.writeLock().lock(); - try { - populated = true; - this.queueActionLocked(DeltaType.ADDITION, obj); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Update items in delta FIFO - * - * @param obj object - */ - @Override - public void update(Object obj) { - lock.writeLock().lock(); - try { - populated = true; - this.queueActionLocked(DeltaType.UPDATION, obj); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Delete items from the delta FIFO - * - * @param obj object - */ - @Override - public void delete(Object obj) { - String id = this.keyOf(obj); - lock.writeLock().lock(); - try { - this.populated = true; - if (this.knownObjects == null) { - if (!this.items.containsKey(id)) { - // Maybe this was deleted when a relist happened - // don't provide a second report of the same deletion. - return; - } - } else { - // We only want to skip the deletion action if the object doesn't - // exist in the knownObjects and it doesn't have corresponding item - // in items. - if (this.knownObjects.getByKey(id) == null && !this.items.containsKey(id)) { - return; - } - } - this.queueActionLocked(DeltaType.DELETION, obj); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Replace the item forcibly. - * - * @param list list of objects - * @param resourceVersion resource version - */ - @Override - public void replace(List list, String resourceVersion) { - lock.writeLock().lock(); - try { - Set keys = new HashSet<>(); - for (Object obj : list) { - String key = this.keyOf(obj); - keys.add(key); - this.queueActionLocked(DeltaType.SYNCHRONIZATION, obj); - } - - if (this.knownObjects == null) { - for (Map.Entry>> entry : this.items.entrySet()) { - if (keys.contains(entry.getKey())) { - continue; - } - - Object deletedObj = null; - AbstractMap.SimpleEntry delta = entry.getValue().peekLast(); // Get newest - if (delta != null) { - deletedObj = delta.getValue(); - } - this.queueActionLocked(DeltaType.DELETION, new DeletedFinalStateUnknown(entry.getKey(), deletedObj)); - } - - if (!this.populated) { - this.populated = true; - this.initialPopulationCount = list.size(); - } - return; - } - - // Detect deletions not already in the queue. - List knownKeys = this.knownObjects.listKeys(); - int queueDeletion = 0; - for (String knownKey : knownKeys) { - if (keys.contains(knownKey)) { - continue; - } - - Object deletedObj = this.knownObjects.getByKey(knownKey); - if (deletedObj == null) { - log.warn("Key {} doesn't exist in the known object store, placing DeleteFinalStateUnknown marker without object", knownKey); - } - queueDeletion++; - this.queueActionLocked(DeltaType.DELETION, new DeletedFinalStateUnknown<>(knownKey, deletedObj)); - } - - if (!this.populated) { - this.populated = true; - this.initialPopulationCount = list.size() + queueDeletion; - } - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Resync the delta FIFO. First, it locks the queue to block any more write - * operation until it finishes all the pending items in the queue. - * - */ - @Override - public void resync() { - lock.writeLock().lock(); - try { - if (this.knownObjects == null) { - return; - } - - List keys = this.knownObjects.listKeys(); - for (String key : keys) { - syncKeyLocked(key); - } - } finally { - lock.writeLock().unlock(); - } - } - - /** - * List keys list. - * - * @return the list - */ - @Override - public List listKeys() { - lock.readLock().lock(); - try { - List keyList = new ArrayList<>(items.size()); - for (Map.Entry>> entry : items.entrySet()) { - keyList.add(entry.getKey()); - } - return keyList; - } finally { - lock.readLock().unlock(); - } - } - - /** - * Get object - * - * @param obj the object - * @return the object - */ - @Override - public Object get(Object obj) { - String key = this.keyOf(obj); - return this.getByKey(key); - } - - /** - * Gets by key - * - * @param key specific key - * @return the object - */ - @Override - public Deque> getByKey(String key) { - lock.readLock().lock(); - try { - Deque> deltas = this.items.get(key); - if (deltas != null) { - // return a shallow copy - return new LinkedList<>(deltas); - } - } finally { - lock.readLock().unlock(); - } - return null; - } - - /** - * List objects - * - * @return the list - */ - @Override - public List list() { - lock.readLock().lock(); - List objects = new ArrayList<>(); - try { - for (Map.Entry>> entry : items.entrySet()) { - Deque> copiedDeltas = new LinkedList<>(entry.getValue()); - objects.add(copiedDeltas); - } - } finally { - lock.readLock().unlock(); - } - return objects; - } - - /** - * Pop Deltas - * - * @param func the func - * @return the deltas - * @throws InterruptedException interruption exception - */ - public Deque> pop(Consumer>> func) throws InterruptedException { - lock.writeLock().lock(); - try { - while (true) { - while (queue.isEmpty()) { - notEmpty.await(); - } - - // There should be data now - String id = this.queue.removeFirst(); - if (this.initialPopulationCount > 0) { - this.initialPopulationCount--; - } - if (!this.items.containsKey(id)) { - // Item may have been deleted subsequently - continue; - } - - Deque> deltas = this.items.get(id); - this.items.remove(id); - func.accept(deltas); - // Don't make any copyDeltas here - return deltas; - } - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Has synced boolean - * - * @return the boolean - */ - public boolean hasSynced() { - lock.readLock().lock(); - try { - return this.populated && this.initialPopulationCount == 0; - } finally { - lock.readLock().unlock(); - } - } - - @Override - public void isPopulated(boolean isPopulated) { - lock.writeLock().lock(); - try { - this.populated = isPopulated; - } finally { - lock.writeLock().unlock(); - } - } - - - /** - * Exposes f's keyFunc, but also detects the key of a Deltas object or - * DeletedFinalStateUnknown objects. - * - * @param obj object - * @return key of deltas object - */ - private String keyOf(Object obj) { - Object innerObj = obj; - if (obj instanceof Deque) { - Deque> deltas = (Deque>) obj; - if (deltas.size() == 0) { - throw new NoSuchElementException("0 length deltas object; can't get key"); - } - innerObj = deltas.peekLast().getValue(); - } - if (innerObj instanceof DeletedFinalStateUnknown) { - return ((DeletedFinalStateUnknown) innerObj).key; - } - return keyFunc.apply((T) innerObj); - } - - - /** - * Appends to the delta list for the object. Caller must hold the lock. - * - * @param actionType action type - * @param obj object - */ - private void queueActionLocked(DeltaType actionType, Object obj) { - String id = this.keyOf(obj); - - Deque> deltas = items.get(id); - if (deltas == null) { - Deque> deltaList = new LinkedList<>(); - deltaList.add(new AbstractMap.SimpleEntry<>(actionType, obj)); - deltas = new LinkedList<>(deltaList); - } else { - deltas.add(new AbstractMap.SimpleEntry<>(actionType, obj)); - } - - Deque> combinedDeltaList = - combineDeltas((LinkedList>) deltas); - - boolean exist = items.containsKey(id); - if (combinedDeltaList != null && combinedDeltaList.size() > 0) { - if (!exist) { - this.queue.add(id); - } - this.items.put(id, new LinkedList<>(combinedDeltaList)); - notEmpty.signalAll(); - } else { - this.items.remove(id); - } - } - - /** - * Add Sync delta. - * Caller must hold the lock. - * - * @param key key for resource - */ - private void syncKeyLocked(String key) { - T obj = this.knownObjects.getByKey(key); - if (obj == null) { - return; - } - - String id = this.keyOf(obj); - Deque> deltas = this.items.get(id); - if (deltas != null && !(deltas == null || deltas.isEmpty())) { - return; - } - this.queueActionLocked(DeltaType.SYNCHRONIZATION, obj); - } - - private Deque> combineDeltas(LinkedList> deltas) { - if (deltas.size() < 2) { - return deltas; - } - - int size = deltas.size(); - AbstractMap.SimpleEntry d1 = deltas.peekLast(); - AbstractMap.SimpleEntry d2 = deltas.get(size - 2); - AbstractMap.SimpleEntry out = isDuplicate(d1, d2); - if (out != null) { - Deque> newDeltas = new LinkedList<>(); - newDeltas.addAll(deltas.subList(0, size - 2)); - newDeltas.add(out); - return newDeltas; - } - return deltas; - } - - /** - * Keep the one with the most information if both are deletions. - * - * @param d1 the most one - * @param d2 the elder one - * @return the most one - */ - private AbstractMap.SimpleEntry isDuplicate(AbstractMap.SimpleEntry d1, AbstractMap.SimpleEntry d2) { - AbstractMap.SimpleEntry deletionDelta = isDeletionDup(d1, d2); - if (deletionDelta != null) { - return deletionDelta; - } - return null; - } - - /** - * Keep the one with the most information if both are deletions. - * - * @param d1 the most one - * @param d2 the elder one - * @return the most one - */ - private AbstractMap.SimpleEntry isDeletionDup(AbstractMap.SimpleEntry d1, AbstractMap.SimpleEntry d2) { - if (!d1.getKey().equals(DeltaType.DELETION) || !d2.getKey().equals(DeltaType.DELETION)) { - return null; - } - - Object obj = d2.getValue(); - - return (obj instanceof DeletedFinalStateUnknown) ? d1 : d2; - } - - Map>> getItems() { return items; } - - /** - * DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where - * an object was deleted but the watch deletion event was missed. In this - * case we don't know the final resting state of the object, so there is a - * chance the included `Obj` is stale. - * - * @param object - */ - public static final class DeletedFinalStateUnknown { - private String key; - private T obj; - - DeletedFinalStateUnknown(String key, T obj) { - this.key = key; - this.obj = obj; - } - - String getKey() { return key; } - - public T getObj() { return obj; } - } - - - public enum DeltaType { - ADDITION, - - UPDATION, - - DELETION, - - SYNCHRONIZATION - } -} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java index 2b0a41f70a2..bf8c4ade3b5 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java @@ -27,8 +27,7 @@ /** * ProcessorListener implements Runnable interface. It's supposed to run in background - * and actually executes its event handler on notification. Note that it allows 1000 - * pending notification at maximum. + * and actually executes its event handler on notification. * * This has been taken from official client: https://github.com/kubernetes-client/java/blob/master/util/src/main/java/io/kubernetes/client/informer/cache/ProcessorListener.java * which has been ported from official go client: https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go#L570 @@ -120,17 +119,21 @@ public void handle(ResourceEventHandler resourceEventHandler) { } public static final class DeleteNotification extends Notification { + + private boolean unknownFinalState; + public DeleteNotification(T oldObject) { + this(oldObject, false); + } + + public DeleteNotification(T oldObject, boolean unknownFinalState) { super(oldObject, null); + this.unknownFinalState = unknownFinalState; } @Override public void handle(ResourceEventHandler resourceEventHandler) { - if (getOldObject() instanceof DeltaFIFO.DeletedFinalStateUnknown) { - resourceEventHandler.onDelete(((DeltaFIFO.DeletedFinalStateUnknown) getOldObject()).getObj(), true); - } else { - resourceEventHandler.onDelete(getOldObject(), false); - } + resourceEventHandler.onDelete(getOldObject(), unknownFinalState); } } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java new file mode 100644 index 00000000000..f3e82deefaa --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStore.java @@ -0,0 +1,120 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.informers.cache; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Wraps a {@link Store} to distribute events related to changes and syncs + * @param + */ +public class ProcessorStore implements Store { + + private Store actualStore; + private SharedProcessor processor; + private volatile boolean populated; + + public ProcessorStore(Store actualStore, SharedProcessor processor) { + this.actualStore = actualStore; + this.processor = processor; + } + + @Override + public void add(T obj) { + update(obj, false); + } + + @Override + public void update(T obj) { + update(obj, false); + } + + private synchronized void update(T obj, boolean sync) { + Object oldObj = this.actualStore.get(obj); + if (oldObj != null) { + this.actualStore.update(obj); + this.processor.distribute(new ProcessorListener.UpdateNotification(oldObj, obj), sync); + } else { + this.actualStore.add(obj); + this.processor.distribute(new ProcessorListener.AddNotification(obj), sync); + } + } + + @Override + public synchronized void delete(T obj) { + Object oldObj = this.actualStore.get(obj); + if (oldObj != null) { + this.actualStore.delete(obj); + this.processor.distribute(new ProcessorListener.DeleteNotification(obj, false), false); + } + } + + @Override + public List list() { + return actualStore.list(); + } + + @Override + public List listKeys() { + return actualStore.listKeys(); + } + + @Override + public Object get(T object) { + return actualStore.get(object); + } + + @Override + public T getByKey(String key) { + return actualStore.getByKey(key); + } + + @Override + public synchronized void replace(List list, String resourceVersion) { + Set keys = new HashSet<>(); + for (T obj : list) { + keys.add(Cache.metaNamespaceKeyFunc(obj)); + update(obj, true); + } + for (T obj : actualStore.list()) { + if (!keys.contains(Cache.metaNamespaceKeyFunc(obj))) { + this.processor.distribute(new ProcessorListener.DeleteNotification(obj, true), false); + } + } + actualStore.replace(list, resourceVersion); + populated = true; + } + + @Override + public void resync() { + this.actualStore.list() + .forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification(i, i), true)); + } + + @Override + public void isPopulated(boolean isPopulated) { + this.populated = isPopulated; + } + + @Override + public boolean hasSynced() { + return populated; + } + +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Store.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Store.java index 6a2460664a9..741d25426be 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Store.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Store.java @@ -105,4 +105,11 @@ public interface Store { */ void isPopulated(boolean isPopulated); + /** + * true if synced + */ + default boolean hasSynced() { + return true; + } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index e4378466a81..136ce53888d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -23,16 +23,14 @@ import io.fabric8.kubernetes.client.informers.SharedInformerEventListener; import io.fabric8.kubernetes.client.informers.cache.Cache; import io.fabric8.kubernetes.client.informers.cache.Controller; -import io.fabric8.kubernetes.client.informers.cache.DeltaFIFO; import io.fabric8.kubernetes.client.informers.cache.Indexer; import io.fabric8.kubernetes.client.informers.cache.ProcessorListener; +import io.fabric8.kubernetes.client.informers.cache.ProcessorStore; import io.fabric8.kubernetes.client.informers.cache.SharedProcessor; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.AbstractMap; -import java.util.Deque; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; @@ -68,11 +66,11 @@ public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher lis this.defaultEventHandlerResyncPeriod = resyncPeriod; this.processor = new SharedProcessor<>(); - this.indexer = new Cache(); + this.indexer = new Cache<>(); + + ProcessorStore processorStore = new ProcessorStore<>(this.indexer, this.processor); - DeltaFIFO fifo = new DeltaFIFO<>(Cache::metaNamespaceKeyFunc, this.indexer); - - this.controller = new Controller<>(apiTypeClass, fifo, listerWatcher, this::handleDeltas, processor::shouldResync, resyncCheckPeriodMillis, context, eventListeners); + this.controller = new Controller<>(apiTypeClass, processorStore, listerWatcher, processor::shouldResync, resyncCheckPeriodMillis, context, eventListeners); controllerThread = new Thread(controller::run, "informer-controller-" + apiTypeClass.getSimpleName()); } @@ -163,49 +161,6 @@ public boolean hasSynced() { return controller != null && this.controller.hasSynced(); } - /** - * Handles deltas and call processor distribute - * - * @param deltas deltas - */ - private void handleDeltas(Deque> deltas) { - if (deltas == null || deltas.isEmpty()) { - return; - } - - // from oldest to newest - for (AbstractMap.SimpleEntry delta : deltas) { - DeltaFIFO.DeltaType deltaType = delta.getKey(); - - switch (deltaType) { - case SYNCHRONIZATION: - case ADDITION: - case UPDATION: - boolean isSync = (deltaType == DeltaFIFO.DeltaType.SYNCHRONIZATION); - Object oldObj = this.indexer.get((T) delta.getValue()); - if (oldObj != null) { - this.indexer.update((T) delta.getValue()); - this.processor.distribute(new ProcessorListener.UpdateNotification(oldObj, delta.getValue()), isSync); - } else { - this.indexer.add((T) delta.getValue()); - this.processor.distribute(new ProcessorListener.AddNotification(delta.getValue()), isSync); - } - break; - case DELETION: - if (!(delta.getValue() instanceof DeltaFIFO.DeletedFinalStateUnknown)) { - this.indexer.delete((T) delta.getValue()); - } else { - T obj = (T)((DeltaFIFO.DeletedFinalStateUnknown)delta.getValue()).getObj(); - if (obj != null) { - this.indexer.delete(obj); - } - } - this.processor.distribute(new ProcessorListener.DeleteNotification(delta.getValue()), false); - break; - } - } - } - @Override public void addIndexers(Map>> indexers) { if (started) { diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java index 7a5c073d30e..0482c22790d 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java @@ -38,7 +38,7 @@ import io.fabric8.kubernetes.client.informers.SharedInformerEventListener; class ControllerTest { - private DeltaFIFO deltaFIFO = Mockito.mock(DeltaFIFO.class, Mockito.RETURNS_DEEP_STUBS); + private Store store = Mockito.mock(Store.class, Mockito.RETURNS_DEEP_STUBS); private abstract class AbstractPodListerWatcher implements ListerWatcher {}; private ListerWatcher listerWatcher = Mockito.mock(AbstractPodListerWatcher.class, Mockito.RETURNS_DEEP_STUBS); @@ -49,8 +49,7 @@ private abstract class AbstractPodListerWatcher implements ListerWatcher controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - simpleEntries -> { }, + Controller controller = new Controller<>(Pod.class, store, listerWatcher, () -> true, 1000L, operationContext, eventListeners); @@ -61,8 +60,7 @@ void testControllerCreationWithResyncPeriodMoreThanZero() { @Test @DisplayName("Controller initialized with resync period less than zero should throw exception") void testControllerCreationWithResyncPeriodLessThanZero() { - assertThrows(IllegalArgumentException.class, () -> new Controller<>(Pod.class, deltaFIFO, listerWatcher, - simpleEntries -> { }, + assertThrows(IllegalArgumentException.class, () -> new Controller<>(Pod.class, store, listerWatcher, () -> true, -1000L, operationContext, eventListeners)); } @@ -71,8 +69,7 @@ void testControllerCreationWithResyncPeriodLessThanZero() { @DisplayName("Controller initialized with resync period 0 should use provided resync period") void testControllerCreationWithResyncPeriodZero() { // Given + When - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - simpleEntries -> { }, + Controller controller = new Controller<>(Pod.class, store, listerWatcher, () -> true, 0L, operationContext, eventListeners); @@ -84,8 +81,7 @@ void testControllerCreationWithResyncPeriodZero() { @DisplayName("Controller stop shut downs/cancels all executor services") void testStop() { // Given - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - simpleEntries -> { }, + Controller controller = new Controller<>(Pod.class, store, listerWatcher, () -> true, 1000L, operationContext, eventListeners); @@ -99,8 +95,8 @@ void testStop() { @DisplayName("Controller initialized with resync period should have synced") void testControllerHasSync() throws InterruptedException { // Given + When - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - simpleEntries -> { }, () -> true, + Controller controller = new Controller<>(Pod.class, store, listerWatcher, + () -> true, 10L, operationContext, eventListeners); Thread controllerThread = newControllerThread(controller); controllerThread.start(); @@ -120,8 +116,7 @@ void testControllerRunWithInterruptedThread() throws InterruptedException { long fullResyncPeriod = 1L; int numberOfResyncs = 1; final CountDownLatch countDown = new CountDownLatch(numberOfResyncs); - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - simpleEntries -> {}, + Controller controller = new Controller<>(Pod.class, store, listerWatcher, () -> { controllerThreadWrapper.interrupt(); return true; @@ -141,8 +136,7 @@ void testControllerRunWithInterruptedThread() throws InterruptedException { @DisplayName("Controller initialized with resync period should initialize resyncExecutor") void testControllerRunWithResyncPeriodGreaterThanZero() throws InterruptedException { // Given + When - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - simpleEntries -> { }, + Controller controller = new Controller<>(Pod.class, store, listerWatcher, () -> true, 1L, operationContext, eventListeners); Thread controllerThread = newControllerThread(controller); @@ -161,8 +155,7 @@ void testControllerRunsReyncFunctionThrowingException() throws InterruptedExcept long fullResyncPeriod = 10L; int numberOfResyncs = 10; final CountDownLatch countDown = new CountDownLatch(numberOfResyncs); - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - simpleEntries -> { }, + Controller controller = new Controller<>(Pod.class, store, listerWatcher, () -> { countDown.countDown(); if( countDown.getCount() == 2 ) { @@ -186,8 +179,7 @@ void testControllerRunsReyncFunctionThrowingException() throws InterruptedExcept @DisplayName("Controller initialized with resync period should initialize resyncExecutor") void testControllerRunWithResyncPeriodGreaterThanZeroAndExecutorNotShutdown() throws InterruptedException { // Given + When - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - simpleEntries -> { }, + Controller controller = new Controller<>(Pod.class, store, listerWatcher, () -> true, 1L, operationContext, eventListeners); Executable controllerRun = newControllerRun(controller); @@ -202,8 +194,7 @@ void testControllerRunWithResyncPeriodGreaterThanZeroAndExecutorNotShutdown() th @DisplayName("Controller initialized with resync period should initialize resyncExecutor") void testControllerRunWithResyncPeriodGreaterThanZeroAndExecutorForcedShutdown() throws InterruptedException { // Given + When - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - simpleEntries -> { }, + Controller controller = new Controller<>(Pod.class, store, listerWatcher, () -> true, 1L, operationContext, eventListeners); Executable controllerRun = newControllerRun(controller); @@ -219,8 +210,7 @@ void testControllerRunWithResyncPeriodGreaterThanZeroAndExecutorForcedShutdown() @DisplayName("Controller initialized with resync period to 0 should initialize resyncExecutor") void testControllerRunWithResyncPeriodToZero() throws InterruptedException { // Given + When - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - simpleEntries -> { }, + Controller controller = new Controller<>(Pod.class, store, listerWatcher, () -> true, 0L, operationContext, eventListeners); Thread controllerThread = newControllerThread(controller); @@ -240,8 +230,7 @@ void testControllerRunsReyncFunctionExpectedNumberOfTime() throws InterruptedExc long fullResyncPeriod = 10L; int numberOfResyncs = 100; final CountDownLatch countDown = new CountDownLatch(numberOfResyncs); - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - simpleEntries -> { }, + Controller controller = new Controller<>(Pod.class, store, listerWatcher, () -> {countDown.countDown(); return true;}, fullResyncPeriod, operationContext, eventListeners); @@ -264,8 +253,7 @@ void testControllerNeverRunsReyncFunctionWhenPeriodIsZero() throws InterruptedEx // Given + When int count = 10; final CountDownLatch countDown = new CountDownLatch(count); - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - simpleEntries -> { }, + Controller controller = new Controller<>(Pod.class, store, listerWatcher, () -> {countDown.countDown(); return true;}, 0, operationContext, eventListeners); Executable controllerRun = newControllerRun(controller); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/DeltaFIFOTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/DeltaFIFOTest.java deleted file mode 100644 index 544027ee0bf..00000000000 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/DeltaFIFOTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.informers.cache; - -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodBuilder; -import org.junit.jupiter.api.Test; - -import java.util.*; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; - -class DeltaFIFOTest { - @Test - void testBasic() throws InterruptedException { - Deque> receivingDeltas = new LinkedList<>(); - Pod foo1 = new PodBuilder().withNewMetadata().withName("pod1").withNamespace("default").endMetadata().build(); - Cache cache = new Cache<>(); - - DeltaFIFO deltaFIFO = new DeltaFIFO<>(Cache::deletionHandlingMetaNamespaceKeyFunc, cache); - AbstractMap.SimpleEntry receivingDelta; - - // Add operation - deltaFIFO.add(foo1); - cache.add(foo1); - deltaFIFO.pop( - (deltas) -> { - AbstractMap.SimpleEntry delta = deltas.peekFirst(); - receivingDeltas.add(delta); - }); - receivingDelta = receivingDeltas.peekFirst(); - receivingDeltas.removeFirst(); - assertEquals(foo1, receivingDelta.getValue()); - assertEquals(DeltaFIFO.DeltaType.ADDITION, receivingDelta.getKey()); - - // Update operation - deltaFIFO.update(foo1); - cache.update(foo1); - deltaFIFO.pop( - (deltas) -> { - AbstractMap.SimpleEntry delta = deltas.peekFirst(); - receivingDeltas.add(delta); - }); - receivingDelta = receivingDeltas.peekFirst(); - receivingDeltas.removeFirst(); - assertEquals(foo1, receivingDelta.getValue()); - assertEquals(DeltaFIFO.DeltaType.UPDATION, receivingDelta.getKey()); - - // Delete operation - deltaFIFO.delete(foo1); - cache.delete(foo1); - deltaFIFO.pop( - (deltas) -> { - AbstractMap.SimpleEntry delta = deltas.peekFirst(); - receivingDeltas.add(delta); - }); - receivingDelta = receivingDeltas.peekFirst(); - receivingDeltas.removeFirst(); - assertEquals(foo1, receivingDelta.getValue()); - assertEquals(DeltaFIFO.DeltaType.DELETION, receivingDelta.getKey()); - - // Sync operation - deltaFIFO.replace(Arrays.asList(foo1), "0"); - cache.replace(Arrays.asList(foo1), "0"); - deltaFIFO.pop( - (deltas) -> { - AbstractMap.SimpleEntry delta = deltas.peekFirst(); - receivingDeltas.add(delta); - }); - receivingDelta = receivingDeltas.peekFirst(); - receivingDeltas.removeFirst(); - assertEquals(foo1, receivingDelta.getValue()); - assertEquals(DeltaFIFO.DeltaType.SYNCHRONIZATION, receivingDelta.getKey()); - } - - @Test - void testDeduplication() { - Pod foo1 = new PodBuilder().withNewMetadata().withName("foo1").withNamespace("default").endMetadata().build(); - Cache cache = new Cache<>(); - DeltaFIFO deltaFIFO = new DeltaFIFO<>(Cache::deletionHandlingMetaNamespaceKeyFunc, cache); - Deque> deltas; - - // Add-delete deduplication - deltaFIFO.add(foo1); - deltaFIFO.delete(foo1); - deltas = deltaFIFO.getItems().get(Cache.deletionHandlingMetaNamespaceKeyFunc(foo1)); - assertEquals(DeltaFIFO.DeltaType.DELETION, deltas.peekLast().getKey()); - assertEquals(foo1, deltas.peekLast().getValue()); - assertEquals(DeltaFIFO.DeltaType.ADDITION, deltas.peekFirst().getKey()); - assertEquals(foo1, deltas.peekFirst().getValue()); - assertEquals(2, deltas.size()); - deltaFIFO.getItems().remove(Cache.deletionHandlingMetaNamespaceKeyFunc(foo1)); - - // Add-delete-delete deduplication - deltaFIFO.add(foo1); - deltaFIFO.delete(foo1); - deltaFIFO.delete(foo1); - deltas = deltaFIFO.getItems().get(Cache.deletionHandlingMetaNamespaceKeyFunc(foo1)); - assertEquals(foo1, deltas.peekLast().getValue()); - assertEquals(DeltaFIFO.DeltaType.ADDITION, deltas.peekFirst().getKey()); - assertEquals(foo1, deltas.peekFirst().getValue()); - assertEquals(2, deltas.size()); - deltaFIFO.getItems().remove(Cache.deletionHandlingMetaNamespaceKeyFunc(foo1)); - } - - @Test - void testResync() { - Pod foo1 = new PodBuilder().withNewMetadata().withName("foo1").withNamespace("default").endMetadata().build(); - Cache cache = new Cache(); - DeltaFIFO deltaFIFO = new DeltaFIFO<>(Cache::deletionHandlingMetaNamespaceKeyFunc, cache); - - // sync after addition - cache.add(foo1); - deltaFIFO.resync(); - - Deque> deltas = deltaFIFO.getItems().get(Cache.deletionHandlingMetaNamespaceKeyFunc(foo1)); - assertEquals(1, deltas.size()); - assertEquals(foo1, deltas.peekLast().getValue()); - assertEquals(DeltaFIFO.DeltaType.SYNCHRONIZATION, deltas.peekLast().getKey()); - } - - @Test - void testReplaceWithDeleteDeltaIn() throws InterruptedException { - Pod oldPod = new PodBuilder().withNewMetadata().withNamespace("default").withName("foo1").endMetadata().build(); - Pod newPod = new PodBuilder().withNewMetadata().withNamespace("default").withName("foo2").endMetadata().build(); - - Cache mockCache = mock(Cache.class); - doReturn(oldPod).when(mockCache).getByKey(Cache.deletionHandlingMetaNamespaceKeyFunc(oldPod)); - DeltaFIFO deltaFIFO = - new DeltaFIFO<>(Cache::deletionHandlingMetaNamespaceKeyFunc, mockCache); - - deltaFIFO.delete(oldPod); - deltaFIFO.replace(Collections.singletonList(newPod), "0"); - - deltaFIFO.pop( - (deltas) -> { - assertEquals(DeltaFIFO.DeltaType.DELETION, deltas.getFirst().getKey()); - assertEquals(oldPod, deltas.getFirst().getValue()); - }); - - deltaFIFO.pop( - (deltas) -> { - assertEquals(DeltaFIFO.DeltaType.SYNCHRONIZATION, deltas.getFirst().getKey()); - assertEquals(newPod, deltas.getFirst().getValue()); - }); - } -} diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java new file mode 100644 index 00000000000..fd41b663f8e --- /dev/null +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorStoreTest.java @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.informers.cache; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.client.informers.cache.ProcessorListener.AddNotification; +import io.fabric8.kubernetes.client.informers.cache.ProcessorListener.DeleteNotification; +import io.fabric8.kubernetes.client.informers.cache.ProcessorListener.Notification; +import io.fabric8.kubernetes.client.informers.cache.ProcessorListener.UpdateNotification; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; + +public class ProcessorStoreTest { + + @Test + public void testEvents() { + ArgumentCaptor> notificationCaptor = ArgumentCaptor.forClass(Notification.class); + ArgumentCaptor syncCaptor = ArgumentCaptor.forClass(Boolean.class); + Store podStore = Mockito.mock(Store.class); + SharedProcessor processor = Mockito.mock(SharedProcessor.class); + + ProcessorStore processorStore = new ProcessorStore<>(podStore, processor); + Pod pod = new Pod(); + + // add notification + processorStore.add(pod); + + // add notification, because the pod doesn't exist in the store + processorStore.update(pod); + + // ignored + processorStore.delete(pod); + + // update notification + Mockito.when(podStore.get(pod)).thenReturn(pod); + processorStore.update(pod); + + // delete notification + processorStore.delete(pod); + + Mockito.verify(processor, Mockito.times(4)).distribute(notificationCaptor.capture(), syncCaptor.capture()); + + List> notifications = notificationCaptor.getAllValues(); + + assertThat(notifications.get(0)).isInstanceOf(AddNotification.class); + assertThat(notifications.get(1)).isInstanceOf(AddNotification.class); + assertThat(notifications.get(2)).isInstanceOf(UpdateNotification.class); + assertThat(notifications.get(3)).isInstanceOf(DeleteNotification.class); + + assertTrue(syncCaptor.getAllValues().stream().allMatch(s->!s.booleanValue())); + } + + @Test + public void testSyncEvents() { + ArgumentCaptor> notificationCaptor = ArgumentCaptor.forClass(Notification.class); + ArgumentCaptor syncCaptor = ArgumentCaptor.forClass(Boolean.class); + Store podStore = Mockito.mock(Store.class); + SharedProcessor processor = Mockito.mock(SharedProcessor.class); + + ProcessorStore processorStore = new ProcessorStore<>(podStore, processor); + + Pod pod = new PodBuilder().withNewMetadata().endMetadata().build(); + + // replace two values with an empty store + processorStore.replace(Arrays.asList(pod, pod), null); + + // resync two values + Mockito.when(podStore.list()).thenReturn(Arrays.asList(pod, pod)); + processorStore.resync(); + + // relist with deletes + processorStore.replace(Collections.emptyList(), null); + + Mockito.verify(processor, Mockito.times(6)).distribute(notificationCaptor.capture(), syncCaptor.capture()); + + List> notifications = notificationCaptor.getAllValues(); + + assertThat(notifications.get(0)).isInstanceOf(AddNotification.class); + assertThat(notifications.get(1)).isInstanceOf(AddNotification.class); + assertThat(notifications.get(2)).isInstanceOf(UpdateNotification.class); + assertThat(notifications.get(3)).isInstanceOf(UpdateNotification.class); + assertTrue(syncCaptor.getAllValues().subList(0, 4).stream().allMatch(s->s.booleanValue())); + + assertThat(notifications.get(4)).isInstanceOf(DeleteNotification.class); + assertThat(notifications.get(5)).isInstanceOf(DeleteNotification.class); + assertTrue(syncCaptor.getAllValues().subList(4, 6).stream().allMatch(s->!s.booleanValue())); + } + +} diff --git a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/DefaultSharedIndexInformerIT.java b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/DefaultSharedIndexInformerIT.java index a012b0d0816..3f8ca0fda4f 100644 --- a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/DefaultSharedIndexInformerIT.java +++ b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/DefaultSharedIndexInformerIT.java @@ -22,7 +22,6 @@ import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.SharedInformerFactory; -import org.arquillian.cube.kubernetes.api.Session; import org.arquillian.cube.kubernetes.impl.requirement.RequiresKubernetes; import org.arquillian.cube.requirement.ArquillianConditionalRunner; import org.jboss.arquillian.test.api.ArquillianResource; @@ -33,7 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @RunWith(ArquillianConditionalRunner.class) @RequiresKubernetes @@ -79,6 +78,7 @@ public void testPodSharedIndexInformerGetsSingleUpdates() throws InterruptedExce // When informerFactory.startAllRegisteredInformers(); updateEvents.await(3 * RESYNC_PERIOD, TimeUnit.MILLISECONDS); + addEvents.await(3 * RESYNC_PERIOD, TimeUnit.MILLISECONDS); // Then assertThat(addEvents.getCount()).isZero();