Skip to content

Commit

Permalink
removing the deltafifo to simplify the informer code
Browse files Browse the repository at this point in the history
it's not in these changes, but it will allow the cache interface to
become fully type safe
  • Loading branch information
shawkins committed Apr 29, 2021
1 parent 5fe1b0d commit c32ad76
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 825 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,6 @@ public void addIndexFunc(String indexName, Function<T, List<String>> indexFunc)
* @return the key
*/
public static <T> String deletionHandlingMetaNamespaceKeyFunc(T object) {
if (object instanceof DeltaFIFO.DeletedFinalStateUnknown) {
DeltaFIFO.DeletedFinalStateUnknown deleteObj = (DeltaFIFO.DeletedFinalStateUnknown) object;
return deleteObj.getKey();
}
return metaNamespaceKeyFunc(object);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.AbstractMap;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
Expand All @@ -48,19 +45,14 @@ public class Controller<T extends HasMetadata, L extends KubernetesResourceList<
*/
private final long fullResyncPeriod;

/**
* Queue stores deltas produced by reflector.
*/
private final DeltaFIFO<T> queue;
private final Store<T> store;

private final ListerWatcher<T, L> listerWatcher;

private Reflector<T, L> reflector;

private final Supplier<Boolean> resyncFunc;

private final Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc;

private final ScheduledExecutorService resyncExecutor;

private ScheduledFuture resyncFuture;
Expand All @@ -71,13 +63,10 @@ public class Controller<T extends HasMetadata, L extends KubernetesResourceList<

private final Class<T> apiTypeClass;

private volatile boolean running;

public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {
this.queue = queue;
public Controller(Class<T> apiTypeClass, Store<T> store, ListerWatcher<T, L> listerWatcher, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {
this.store = store;
this.listerWatcher = listerWatcher;
this.apiTypeClass = apiTypeClass;
this.processFunc = processFunc;
this.resyncFunc = resyncFunc;
if (fullResyncPeriod < 0) {
throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value");
Expand All @@ -88,14 +77,14 @@ public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L>

// Starts one daemon thread for resync
this.resyncExecutor = Executors.newSingleThreadScheduledExecutor();
this.reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext);
this.reflector = new Reflector<>(apiTypeClass, listerWatcher, store, operationContext);
}

public void run() {
log.info("informer#Controller: ready to run resync and reflector runnable");
// Start the resync runnable
if (fullResyncPeriod > 0) {
ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc);
ResyncRunnable resyncRunnable = new ResyncRunnable(store, resyncFunc);
if(!resyncExecutor.isShutdown()) {
resyncFuture = resyncExecutor.scheduleAtFixedRate(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS);
}
Expand All @@ -104,16 +93,10 @@ public void run() {
}

try {
running = true;
reflector.listAndWatch();

// Start the process loop
this.processLoop();
} catch (Exception exception) {
log.warn("Reflector list-watching job exiting because the thread-pool is shutting down", exception);
this.eventListeners.forEach(listener -> listener.onException(exception));
} finally {
running = false;
}
}

Expand All @@ -135,7 +118,7 @@ public void stop() {
* @return boolean value about queue sync status
*/
public boolean hasSynced() {
return this.queue.hasSynced();
return this.store.hasSynced();
}

/**
Expand All @@ -150,30 +133,12 @@ Reflector<T, L> getReflector() {
return reflector;
}

/**
* drains the work queue.
*/
private void processLoop() throws Exception {
while (!Thread.currentThread().isInterrupted()) {
try {
this.queue.pop(this.processFunc);
} catch (InterruptedException t) {
log.debug("DefaultController#processLoop got interrupted: {}", t.getMessage());
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
log.error("DefaultController#processLoop recovered from crashing {} ", e.getMessage(), e);
throw e;
}
}
}

ScheduledExecutorService getResyncExecutor() {
return this.resyncExecutor;
}

public boolean isRunning() {
return running && this.reflector.isRunning();
return this.reflector.isRunning();
}

public long getFullResyncPeriod() {
Expand Down
Loading

0 comments on commit c32ad76

Please # to comment.