diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index de473d0b8e..adc1a43f0c 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -195,6 +195,16 @@ public void sendCommand(final ProtocolCommand cmd, final String... args) { sendCommand(new CommandArguments(cmd).addObjects((Object[]) args)); } + @Experimental + public void sendCommandWithTracking(final ProtocolCommand cmd, List prefixList, final String... args) { + List list = new ArrayList<>(); + for (String prefix : prefixList) { + list.add("PREFIX"); + list.add(prefix); + } + sendCommand(new CommandArguments(cmd).addObjects((Object[]) args).addObjects(list.toArray())); + } + public void sendCommand(final ProtocolCommand cmd, final byte[]... args) { sendCommand(new CommandArguments(cmd).addObjects((Object[]) args)); } diff --git a/src/main/java/redis/clients/jedis/ConnectionFactory.java b/src/main/java/redis/clients/jedis/ConnectionFactory.java index 7440417152..87a8031225 100644 --- a/src/main/java/redis/clients/jedis/ConnectionFactory.java +++ b/src/main/java/redis/clients/jedis/ConnectionFactory.java @@ -6,6 +6,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import redis.clients.jedis.annots.Experimental; @@ -29,6 +34,18 @@ public class ConnectionFactory implements PooledObjectFactory { private final Supplier objectMaker; private final AuthXEventListener authXEventListener; + /** + * Only one connection is maintained between a client and a server node for tracking and receiving invalidation messages. + *

+ * This is done to avoid the server sending duplicate messages to multiple connections, + * thereby reducing the CPU consumption of the server. + */ + private CacheConnection trackingConnection = null; + + /** + * The single thread executor for listening invalidation messages. + */ + private ScheduledExecutorService invalidationListeningExecutor = null; public ConnectionFactory(final HostAndPort hostAndPort) { this(hostAndPort, DefaultJedisClientConfig.builder().build(), null); @@ -42,6 +59,11 @@ public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, Cache csCache) { this(new DefaultJedisSocketFactory(hostAndPort, clientConfig), clientConfig, csCache); + if (!clientConfig.getTrackingConfig().isTrackingModeOnDefault()) { + invalidationListeningExecutor = Executors.newSingleThreadScheduledExecutor(); + // initialize tracking connection + initializeTrackingConnection(); + } } public ConnectionFactory(final JedisSocketFactory jedisSocketFactory, @@ -68,6 +90,56 @@ private ConnectionFactory(final JedisSocketFactory jedisSocketFactory, } } + /** + * Create a "tracking" connection and start tracking and listen invalidation messages periodically. + */ + @Experimental + private void initializeTrackingConnection() { + trackingConnection = new CacheConnection(jedisSocketFactory, clientConfig, clientSideCache); + tracking(); + startInvalidationListenerThread(); + } + + /** + * Tracking on broadcasting mode. + */ + @Experimental + private void tracking() { + List trackingPrefixList = clientConfig.getTrackingConfig().getTrackingPrefixList(); + // if no prefix is set, the prefix is "". + if (trackingPrefixList == null) { + trackingPrefixList = new ArrayList<>(); + trackingPrefixList.add(""); + } + trackingConnection.sendCommandWithTracking(Protocol.Command.CLIENT, trackingPrefixList, "TRACKING", "ON", "BCAST"); + String reply = trackingConnection.getStatusCodeReply(); + if (!"OK".equals(reply)) { + throw new JedisException("Could not enable client tracking. Reply: " + reply); + } + } + + /** + * Start a scheduled task to listen for invalidation event. + */ + @Experimental + private void startInvalidationListenerThread() { + invalidationListeningExecutor.scheduleAtFixedRate(() -> { + if (trackingConnection.isBroken() || !trackingConnection.isConnected() || !trackingConnection.ping()) { + // flush cache(broadcasting mode only trackingConnection disconnect) + clientSideCache.flush(); + // create a new connection and enable tracking + try { + trackingConnection = new CacheConnection(jedisSocketFactory, clientConfig, clientSideCache); + } catch (Exception e) { + // do something + } + tracking(); + } + trackingConnection.readPushesWithCheckingBroken(); + // period? + }, 2, 2, TimeUnit.SECONDS); + } + private Supplier connectionSupplier() { return clientSideCache == null ? () -> new Connection(jedisSocketFactory, clientConfig) : () -> new CacheConnection(jedisSocketFactory, clientConfig, clientSideCache); diff --git a/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java b/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java index 25a4737ec0..5a6723731c 100644 --- a/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java +++ b/src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java @@ -6,6 +6,7 @@ import javax.net.ssl.SSLSocketFactory; import redis.clients.jedis.authentication.AuthXManager; +import redis.clients.jedis.csc.TrackingConfig; public final class DefaultJedisClientConfig implements JedisClientConfig { @@ -33,6 +34,8 @@ public final class DefaultJedisClientConfig implements JedisClientConfig { private final AuthXManager authXManager; + private final TrackingConfig trackingConfig; + private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) { this.redisProtocol = builder.redisProtocol; this.connectionTimeoutMillis = builder.connectionTimeoutMillis; @@ -50,6 +53,7 @@ private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) { this.clientSetInfoConfig = builder.clientSetInfoConfig; this.readOnlyForRedisClusterReplicas = builder.readOnlyForRedisClusterReplicas; this.authXManager = builder.authXManager; + this.trackingConfig = builder.trackingConfig; } @Override @@ -143,6 +147,11 @@ public boolean isReadOnlyForRedisClusterReplicas() { return readOnlyForRedisClusterReplicas; } + @Override + public TrackingConfig getTrackingConfig() { + return trackingConfig; + } + public static Builder builder() { return new Builder(); } @@ -175,6 +184,8 @@ public static class Builder { private AuthXManager authXManager = null; + private TrackingConfig trackingConfig = TrackingConfig.DEFAULT; + private Builder() { } @@ -297,6 +308,11 @@ public Builder authXManager(AuthXManager authXManager) { return this; } + public Builder trackingConfig(TrackingConfig trackingConfig) { + this.trackingConfig = trackingConfig; + return this; + } + public Builder from(JedisClientConfig instance) { this.redisProtocol = instance.getRedisProtocol(); this.connectionTimeoutMillis = instance.getConnectionTimeoutMillis(); @@ -314,6 +330,7 @@ public Builder from(JedisClientConfig instance) { this.clientSetInfoConfig = instance.getClientSetInfoConfig(); this.readOnlyForRedisClusterReplicas = instance.isReadOnlyForRedisClusterReplicas(); this.authXManager = instance.getAuthXManager(); + this.trackingConfig = instance.getTrackingConfig(); return this; } } @@ -375,6 +392,7 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) { } builder.authXManager(copy.getAuthXManager()); + builder.trackingConfig(copy.getTrackingConfig()); return builder.build(); } diff --git a/src/main/java/redis/clients/jedis/JedisClientConfig.java b/src/main/java/redis/clients/jedis/JedisClientConfig.java index ce7fd82de4..0ff8ced74f 100644 --- a/src/main/java/redis/clients/jedis/JedisClientConfig.java +++ b/src/main/java/redis/clients/jedis/JedisClientConfig.java @@ -6,6 +6,7 @@ import javax.net.ssl.SSLSocketFactory; import redis.clients.jedis.authentication.AuthXManager; +import redis.clients.jedis.csc.TrackingConfig; public interface JedisClientConfig { @@ -115,4 +116,12 @@ default boolean isReadOnlyForRedisClusterReplicas() { default ClientSetInfoConfig getClientSetInfoConfig() { return ClientSetInfoConfig.DEFAULT; } + + /** + * Modify Tracking config(tracking mode, tracking prefixes) + * @return Tracking config + */ + default TrackingConfig getTrackingConfig() { + return TrackingConfig.DEFAULT; + } } diff --git a/src/main/java/redis/clients/jedis/csc/CacheConnection.java b/src/main/java/redis/clients/jedis/csc/CacheConnection.java index f157d95a94..09ca39978e 100644 --- a/src/main/java/redis/clients/jedis/csc/CacheConnection.java +++ b/src/main/java/redis/clients/jedis/csc/CacheConnection.java @@ -14,6 +14,10 @@ public class CacheConnection extends Connection { + /** + * tracking mode(true:default; false:broadcast) + */ + private boolean isDefaultMode = true; private final Cache cache; private ReentrantLock lock; private static final String REDIS = "redis"; @@ -21,7 +25,6 @@ public class CacheConnection extends Connection { public CacheConnection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig, Cache cache) { super(socketFactory, clientConfig); - if (protocol != RedisProtocol.RESP3) { throw new JedisException("Client side caching is only supported with RESP3."); } @@ -33,7 +36,11 @@ public CacheConnection(final JedisSocketFactory socketFactory, JedisClientConfig } } this.cache = Objects.requireNonNull(cache); - initializeClientSideCache(); + this.isDefaultMode = clientConfig.getTrackingConfig().isTrackingModeOnDefault(); + // only default mode + if (isDefaultMode) { + initializeClientSideCache(); + } } @Override @@ -66,7 +73,10 @@ protected void protocolReadPushes(RedisInputStream inputStream) { @Override public void disconnect() { super.disconnect(); - cache.flush(); + if (isDefaultMode) { + // The cache is cleared when the connection is disconnected only in default mode + cache.flush(); + } } @Override @@ -79,6 +89,11 @@ public T executeCommand(final CommandObject commandObject) { CacheEntry cacheEntry = cache.get(cacheKey); if (cacheEntry != null) { // (probable) CACHE HIT !! + if (!isDefaultMode) { + // broadcast mode returns directly + cache.getStats().hit(); + return cacheEntry.getValue(); + } cacheEntry = validateEntry(cacheEntry); if (cacheEntry != null) { // CACHE HIT confirmed !!! diff --git a/src/main/java/redis/clients/jedis/csc/TrackingConfig.java b/src/main/java/redis/clients/jedis/csc/TrackingConfig.java new file mode 100644 index 0000000000..675ee00dc4 --- /dev/null +++ b/src/main/java/redis/clients/jedis/csc/TrackingConfig.java @@ -0,0 +1,37 @@ +package redis.clients.jedis.csc; + +import java.util.Collections; +import java.util.List; + +public final class TrackingConfig { + + /** + * tracking mode(true:default; false:broadcast) + */ + private final boolean trackingModeOnDefault; + + /** + * tracking prefix list(only broadcast mode) + */ + private final List trackingPrefixList; + + public TrackingConfig(List trackingPrefixList, boolean trackingModeOnDefault) { + this.trackingPrefixList = trackingPrefixList; + this.trackingModeOnDefault = trackingModeOnDefault; + } + + public boolean isTrackingModeOnDefault() { + return trackingModeOnDefault; + } + + public List getTrackingPrefixList() { + return trackingPrefixList; + } + + public static final TrackingConfig DEFAULT = new TrackingConfig(null, true); + + /** + * prefix: "" + */ + public static final TrackingConfig BROADCAST = new TrackingConfig(Collections.emptyList(), false); +} diff --git a/src/test/java/redis/clients/jedis/csc/CSCTest.java b/src/test/java/redis/clients/jedis/csc/CSCTest.java new file mode 100644 index 0000000000..294c15370d --- /dev/null +++ b/src/test/java/redis/clients/jedis/csc/CSCTest.java @@ -0,0 +1,50 @@ +package redis.clients.jedis.csc; + +import org.junit.Test; +import redis.clients.jedis.DefaultJedisClientConfig; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisClientConfig; +import redis.clients.jedis.UnifiedJedis; +import redis.clients.jedis.commands.ProtocolCommand; + +import java.util.List; + +public class CSCTest { + private static JedisClientConfig clientConfig; + private static CacheConfig cacheConfig; + + static { + clientConfig = DefaultJedisClientConfig.builder() + .resp3() // RESP3 protocol is required for client-side caching + .trackingConfig(TrackingConfig.BROADCAST) + .build(); + + cacheConfig = getCacheConfig(); + } + + private static CacheConfig getCacheConfig() { + + // This is a simple cacheable implementation that ignores keys starting with "ignore_me" + Cacheable cacheable = new DefaultCacheable() { + @Override + public boolean isCacheable(ProtocolCommand command, List keys) { + return isDefaultCacheableCommand(command); + } + }; + + // Create a cache with a maximum size of 10000 entries + return CacheConfig.builder() + .maxSize(10000) + .cacheable(cacheable) + .build(); + } + + @Test + public void testTrackingOnBroadcastMode() { + HostAndPort node = HostAndPort.from("127.0.0.1:6379"); + try (UnifiedJedis client = new UnifiedJedis(node, clientConfig, CacheFactory.getCache(cacheConfig))) { + String a1 = client.get("a"); + String a2 = client.get("a"); + } + } +}