Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Support CSC(client-side caching) on broadcasting mode #4057

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,16 @@ public void sendCommand(final ProtocolCommand cmd, final String... args) {
sendCommand(new CommandArguments(cmd).addObjects((Object[]) args));
}

@Experimental
public void sendCommandWithTracking(final ProtocolCommand cmd, List<String> prefixList, final String... args) {
List<String> list = new ArrayList<>();
for (String prefix : prefixList) {
list.add("PREFIX");
list.add(prefix);
}
sendCommand(new CommandArguments(cmd).addObjects((Object[]) args).addObjects(list.toArray()));
}

public void sendCommand(final ProtocolCommand cmd, final byte[]... args) {
sendCommand(new CommandArguments(cmd).addObjects((Object[]) args));
}
Expand Down
72 changes: 72 additions & 0 deletions src/main/java/redis/clients/jedis/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import redis.clients.jedis.annots.Experimental;
Expand All @@ -29,6 +34,18 @@ public class ConnectionFactory implements PooledObjectFactory<Connection> {
private final Supplier<Connection> objectMaker;

private final AuthXEventListener authXEventListener;
/**
* Only one connection is maintained between a client and a server node for tracking and receiving invalidation messages.
* <p>
* This is done to avoid the server sending duplicate messages to multiple connections,
* thereby reducing the CPU consumption of the server.
*/
private CacheConnection trackingConnection = null;

/**
* The single thread executor for listening invalidation messages.
*/
private ScheduledExecutorService invalidationListeningExecutor = null;

public ConnectionFactory(final HostAndPort hostAndPort) {
this(hostAndPort, DefaultJedisClientConfig.builder().build(), null);
Expand All @@ -42,6 +59,11 @@ public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig
public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig,
Cache csCache) {
this(new DefaultJedisSocketFactory(hostAndPort, clientConfig), clientConfig, csCache);
if (!clientConfig.getTrackingConfig().isTrackingModeOnDefault()) {
invalidationListeningExecutor = Executors.newSingleThreadScheduledExecutor();
// initialize tracking connection
initializeTrackingConnection();
}
}

public ConnectionFactory(final JedisSocketFactory jedisSocketFactory,
Expand All @@ -68,6 +90,56 @@ private ConnectionFactory(final JedisSocketFactory jedisSocketFactory,
}
}

/**
* Create a "tracking" connection and start tracking and listen invalidation messages periodically.
*/
@Experimental
private void initializeTrackingConnection() {
trackingConnection = new CacheConnection(jedisSocketFactory, clientConfig, clientSideCache);
tracking();
startInvalidationListenerThread();
}

/**
* Tracking on broadcasting mode.
*/
@Experimental
private void tracking() {
List<String> trackingPrefixList = clientConfig.getTrackingConfig().getTrackingPrefixList();
// if no prefix is set, the prefix is "".
if (trackingPrefixList == null) {
trackingPrefixList = new ArrayList<>();
trackingPrefixList.add("");
}
trackingConnection.sendCommandWithTracking(Protocol.Command.CLIENT, trackingPrefixList, "TRACKING", "ON", "BCAST");
String reply = trackingConnection.getStatusCodeReply();
if (!"OK".equals(reply)) {
throw new JedisException("Could not enable client tracking. Reply: " + reply);
}
}

/**
* Start a scheduled task to listen for invalidation event.
*/
@Experimental
private void startInvalidationListenerThread() {
invalidationListeningExecutor.scheduleAtFixedRate(() -> {
if (trackingConnection.isBroken() || !trackingConnection.isConnected() || !trackingConnection.ping()) {
// flush cache(broadcasting mode only trackingConnection disconnect)
clientSideCache.flush();
// create a new connection and enable tracking
try {
trackingConnection = new CacheConnection(jedisSocketFactory, clientConfig, clientSideCache);
} catch (Exception e) {
// do something
}
tracking();
}
trackingConnection.readPushesWithCheckingBroken();
// period?
}, 2, 2, TimeUnit.SECONDS);
}

private Supplier<Connection> connectionSupplier() {
return clientSideCache == null ? () -> new Connection(jedisSocketFactory, clientConfig)
: () -> new CacheConnection(jedisSocketFactory, clientConfig, clientSideCache);
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/redis/clients/jedis/DefaultJedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import javax.net.ssl.SSLSocketFactory;

import redis.clients.jedis.authentication.AuthXManager;
import redis.clients.jedis.csc.TrackingConfig;

public final class DefaultJedisClientConfig implements JedisClientConfig {

Expand Down Expand Up @@ -33,6 +34,8 @@ public final class DefaultJedisClientConfig implements JedisClientConfig {

private final AuthXManager authXManager;

private final TrackingConfig trackingConfig;

private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) {
this.redisProtocol = builder.redisProtocol;
this.connectionTimeoutMillis = builder.connectionTimeoutMillis;
Expand All @@ -50,6 +53,7 @@ private DefaultJedisClientConfig(DefaultJedisClientConfig.Builder builder) {
this.clientSetInfoConfig = builder.clientSetInfoConfig;
this.readOnlyForRedisClusterReplicas = builder.readOnlyForRedisClusterReplicas;
this.authXManager = builder.authXManager;
this.trackingConfig = builder.trackingConfig;
}

@Override
Expand Down Expand Up @@ -143,6 +147,11 @@ public boolean isReadOnlyForRedisClusterReplicas() {
return readOnlyForRedisClusterReplicas;
}

@Override
public TrackingConfig getTrackingConfig() {
return trackingConfig;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -175,6 +184,8 @@ public static class Builder {

private AuthXManager authXManager = null;

private TrackingConfig trackingConfig = TrackingConfig.DEFAULT;

private Builder() {
}

Expand Down Expand Up @@ -297,6 +308,11 @@ public Builder authXManager(AuthXManager authXManager) {
return this;
}

public Builder trackingConfig(TrackingConfig trackingConfig) {
this.trackingConfig = trackingConfig;
return this;
}

public Builder from(JedisClientConfig instance) {
this.redisProtocol = instance.getRedisProtocol();
this.connectionTimeoutMillis = instance.getConnectionTimeoutMillis();
Expand All @@ -314,6 +330,7 @@ public Builder from(JedisClientConfig instance) {
this.clientSetInfoConfig = instance.getClientSetInfoConfig();
this.readOnlyForRedisClusterReplicas = instance.isReadOnlyForRedisClusterReplicas();
this.authXManager = instance.getAuthXManager();
this.trackingConfig = instance.getTrackingConfig();
return this;
}
}
Expand Down Expand Up @@ -375,6 +392,7 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
}

builder.authXManager(copy.getAuthXManager());
builder.trackingConfig(copy.getTrackingConfig());

return builder.build();
}
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/redis/clients/jedis/JedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import javax.net.ssl.SSLSocketFactory;

import redis.clients.jedis.authentication.AuthXManager;
import redis.clients.jedis.csc.TrackingConfig;

public interface JedisClientConfig {

Expand Down Expand Up @@ -115,4 +116,12 @@ default boolean isReadOnlyForRedisClusterReplicas() {
default ClientSetInfoConfig getClientSetInfoConfig() {
return ClientSetInfoConfig.DEFAULT;
}

/**
* Modify Tracking config(tracking mode, tracking prefixes)
* @return Tracking config
*/
default TrackingConfig getTrackingConfig() {
return TrackingConfig.DEFAULT;
}
}
21 changes: 18 additions & 3 deletions src/main/java/redis/clients/jedis/csc/CacheConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@

public class CacheConnection extends Connection {

/**
* tracking mode(true:default; false:broadcast)
*/
private boolean isDefaultMode = true;
private final Cache cache;
private ReentrantLock lock;
private static final String REDIS = "redis";
private static final String MIN_REDIS_VERSION = "7.4";

public CacheConnection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig, Cache cache) {
super(socketFactory, clientConfig);

if (protocol != RedisProtocol.RESP3) {
throw new JedisException("Client side caching is only supported with RESP3.");
}
Expand All @@ -33,7 +36,11 @@ public CacheConnection(final JedisSocketFactory socketFactory, JedisClientConfig
}
}
this.cache = Objects.requireNonNull(cache);
initializeClientSideCache();
this.isDefaultMode = clientConfig.getTrackingConfig().isTrackingModeOnDefault();
// only default mode
if (isDefaultMode) {
initializeClientSideCache();
}
}

@Override
Expand Down Expand Up @@ -66,7 +73,10 @@ protected void protocolReadPushes(RedisInputStream inputStream) {
@Override
public void disconnect() {
super.disconnect();
cache.flush();
if (isDefaultMode) {
// The cache is cleared when the connection is disconnected only in default mode
cache.flush();
}
}

@Override
Expand All @@ -79,6 +89,11 @@ public <T> T executeCommand(final CommandObject<T> commandObject) {

CacheEntry<T> cacheEntry = cache.get(cacheKey);
if (cacheEntry != null) { // (probable) CACHE HIT !!
if (!isDefaultMode) {
// broadcast mode returns directly
cache.getStats().hit();
return cacheEntry.getValue();
}
cacheEntry = validateEntry(cacheEntry);
if (cacheEntry != null) {
// CACHE HIT confirmed !!!
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/redis/clients/jedis/csc/TrackingConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package redis.clients.jedis.csc;

import java.util.Collections;
import java.util.List;

public final class TrackingConfig {

/**
* tracking mode(true:default; false:broadcast)
*/
private final boolean trackingModeOnDefault;

/**
* tracking prefix list(only broadcast mode)
*/
private final List<String> trackingPrefixList;

public TrackingConfig(List<String> trackingPrefixList, boolean trackingModeOnDefault) {
this.trackingPrefixList = trackingPrefixList;
this.trackingModeOnDefault = trackingModeOnDefault;
}

public boolean isTrackingModeOnDefault() {
return trackingModeOnDefault;
}

public List<String> getTrackingPrefixList() {
return trackingPrefixList;
}

public static final TrackingConfig DEFAULT = new TrackingConfig(null, true);

/**
* prefix: ""
*/
public static final TrackingConfig BROADCAST = new TrackingConfig(Collections.emptyList(), false);
}
50 changes: 50 additions & 0 deletions src/test/java/redis/clients/jedis/csc/CSCTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package redis.clients.jedis.csc;

import org.junit.Test;
import redis.clients.jedis.DefaultJedisClientConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisClientConfig;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.commands.ProtocolCommand;

import java.util.List;

public class CSCTest {
private static JedisClientConfig clientConfig;
private static CacheConfig cacheConfig;

static {
clientConfig = DefaultJedisClientConfig.builder()
.resp3() // RESP3 protocol is required for client-side caching
.trackingConfig(TrackingConfig.BROADCAST)
.build();

cacheConfig = getCacheConfig();
}

private static CacheConfig getCacheConfig() {

// This is a simple cacheable implementation that ignores keys starting with "ignore_me"
Cacheable cacheable = new DefaultCacheable() {
@Override
public boolean isCacheable(ProtocolCommand command, List<Object> keys) {
return isDefaultCacheableCommand(command);
}
};

// Create a cache with a maximum size of 10000 entries
return CacheConfig.builder()
.maxSize(10000)
.cacheable(cacheable)
.build();
}

@Test
public void testTrackingOnBroadcastMode() {
HostAndPort node = HostAndPort.from("127.0.0.1:6379");
try (UnifiedJedis client = new UnifiedJedis(node, clientConfig, CacheFactory.getCache(cacheConfig))) {
String a1 = client.get("a");
String a2 = client.get("a");
}
}
}