From 4226f0fdef70d67f4e634ce4370af9fe25b0ce53 Mon Sep 17 00:00:00 2001 From: Nicolas Filotto Date: Wed, 15 Jan 2025 10:03:48 +0100 Subject: [PATCH] CAMEL-21614: camel-core - Prevent cache change miss on queue swap (#16820) ## Motivation In some specific use cases, the eviction of the entries never ends ## Modifications: * Improve the way to swap the queue of change to avoid the race condition that could cause cache changes miss by leveraging a `ReadWriteLock` to only prevent change additions during the swap --- .../camel/support/cache/SimpleLRUCache.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java index b6996a3f21526..7c1b76e601af4 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/SimpleLRUCache.java @@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -49,6 +51,10 @@ public class SimpleLRUCache extends ConcurrentHashMap { * The flag indicating that an eviction process is in progress. */ private final AtomicBoolean eviction = new AtomicBoolean(); + /** + * The lock to prevent the addition of changes during the swap of queue of changes. + */ + private final ReadWriteLock swapLock = new ReentrantReadWriteLock(); /** * The maximum cache size. */ @@ -84,7 +90,13 @@ private V addChange(OperationContext context, Function entry = Map.entry(key, value); + swapLock.readLock().lock(); + try { + lastChanges.get().add(entry); + } finally { + swapLock.readLock().unlock(); + } return value; } @@ -269,7 +281,13 @@ private Entry nextOldestChange() { */ private void compressChanges() { Deque> newChanges = new ConcurrentLinkedDeque<>(); - Deque> currentChanges = lastChanges.getAndSet(newChanges); + Deque> currentChanges; + swapLock.writeLock().lock(); + try { + currentChanges = lastChanges.getAndSet(newChanges); + } finally { + swapLock.writeLock().unlock(); + } Set keys = new HashSet<>(); Entry entry; while ((entry = currentChanges.pollLast()) != null) {