From 72f1d16b9abf0799aceb78484f492da582ad96d7 Mon Sep 17 00:00:00 2001 From: Nicolas Filotto Date: Tue, 14 Jan 2025 19:29:17 +0100 Subject: [PATCH] CAMEL-21614: camel-core - Prevent cache change miss on queue swap --- .../camel/support/cache/SimpleLRUCache.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java index b6996a3f21526..7c1b76e601af4 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java @@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -49,6 +51,10 @@ public class SimpleLRUCache extends ConcurrentHashMap { * The flag indicating that an eviction process is in progress. */ private final AtomicBoolean eviction = new AtomicBoolean(); + /** + * The lock to prevent the addition of changes during the swap of queue of changes. + */ + private final ReadWriteLock swapLock = new ReentrantReadWriteLock(); /** * The maximum cache size. */ @@ -84,7 +90,13 @@ private V addChange(OperationContext context, Function entry = Map.entry(key, value); + swapLock.readLock().lock(); + try { + lastChanges.get().add(entry); + } finally { + swapLock.readLock().unlock(); + } return value; } @@ -269,7 +281,13 @@ private Entry nextOldestChange() { */ private void compressChanges() { Deque> newChanges = new ConcurrentLinkedDeque<>(); - Deque> currentChanges = lastChanges.getAndSet(newChanges); + Deque> currentChanges; + swapLock.writeLock().lock(); + try { + currentChanges = lastChanges.getAndSet(newChanges); + } finally { + swapLock.writeLock().unlock(); + } Set keys = new HashSet<>(); Entry entry; while ((entry = currentChanges.pollLast()) != null) {