Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/main/java/redis/clients/jedis/ClusterPipeline.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,40 @@
package redis.clients.jedis;

import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.IOUtils;

public class ClusterPipeline extends MultiNodePipelineBase {

private final ClusterConnectionProvider provider;
private AutoCloseable closeable = null;

public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig));
this.closeable = this.provider;
}

public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig));
this.closeable = this.provider;
}

public ClusterPipeline(ClusterConnectionProvider provider) {
super(new ClusterCommandObjects());
this.provider = provider;
}

@Override
public void close() {
try {
super.close();
} finally {
IOUtils.closeQuietly(closeable);
}
}

Comment on lines +29 to +37
Copy link

@dolavb dolavb Feb 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this close the cache refresher whenever we close a pipeline? A pipeline does not have the same lifecyle as the connection provider, I do not understand this close here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dolavb
Looking at the newly introduced constructor they are creating a fresh
new ClusterConnectionProvider(clusterNodes, clientConfig)
and only in this case preserve it for clean up later.

I think the idea is since we create it inside the constructor, we clean it

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I had misread the initialization of the provider to be the setting of the closable.

@Override
protected HostAndPort getNodeKey(CommandArguments args) {
return provider.getNode(((ClusterCommandArguments) args).getCommandHashSlot());
Expand Down
62 changes: 31 additions & 31 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,80 +108,80 @@ public JedisCluster(Set<HostAndPort> nodes, int timeout,
this(nodes, timeout, DEFAULT_MAX_ATTEMPTS, poolConfig);
}

public JedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts,
public JedisCluster(Set<HostAndPort> clusterNodes, int timeout, int maxAttempts,
final GenericObjectPoolConfig<Connection> poolConfig) {
this(jedisClusterNode, timeout, timeout, maxAttempts, poolConfig);
this(clusterNodes, timeout, timeout, maxAttempts, poolConfig);
}

public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout,
int soTimeout, int maxAttempts, final GenericObjectPoolConfig<Connection> poolConfig) {
this(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, null, poolConfig);
public JedisCluster(Set<HostAndPort> clusterNodes, int connectionTimeout, int soTimeout,
int maxAttempts, final GenericObjectPoolConfig<Connection> poolConfig) {
this(clusterNodes, connectionTimeout, soTimeout, maxAttempts, null, poolConfig);
}

public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout,
int soTimeout, int maxAttempts, String password, GenericObjectPoolConfig<Connection> poolConfig) {
this(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, null, poolConfig);
public JedisCluster(Set<HostAndPort> clusterNodes, int connectionTimeout, int soTimeout,
int maxAttempts, String password, GenericObjectPoolConfig<Connection> poolConfig) {
this(clusterNodes, connectionTimeout, soTimeout, maxAttempts, password, null, poolConfig);
}

public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout,
public JedisCluster(Set<HostAndPort> clusterNodes, int connectionTimeout,
int soTimeout, int maxAttempts, String password, String clientName,
GenericObjectPoolConfig<Connection> poolConfig) {
this(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, null, password, clientName,
this(clusterNodes, connectionTimeout, soTimeout, maxAttempts, null, password, clientName,
poolConfig);
}

public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout,
int soTimeout, int maxAttempts, String user, String password, String clientName,
public JedisCluster(Set<HostAndPort> clusterNodes, int connectionTimeout, int soTimeout,
int maxAttempts, String user, String password, String clientName,
GenericObjectPoolConfig<Connection> poolConfig) {
this(jedisClusterNode, DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
this(clusterNodes, DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
.socketTimeoutMillis(soTimeout).user(user).password(password).clientName(clientName).build(),
maxAttempts, poolConfig);
}

public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout,
public JedisCluster(Set<HostAndPort> clusterNodes, int connectionTimeout,
int soTimeout, int infiniteSoTimeout, int maxAttempts, String user, String password,
String clientName, GenericObjectPoolConfig<Connection> poolConfig) {
this(jedisClusterNode, DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
this(clusterNodes, DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
.socketTimeoutMillis(soTimeout).blockingSocketTimeoutMillis(infiniteSoTimeout)
.user(user).password(password).clientName(clientName).build(), maxAttempts, poolConfig);
}

public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout,
int soTimeout, int maxAttempts, String password, String clientName,
public JedisCluster(Set<HostAndPort> clusterNodes, int connectionTimeout, int soTimeout,
int maxAttempts, String password, String clientName,
GenericObjectPoolConfig<Connection> poolConfig, boolean ssl) {
this(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, null, password, clientName,
this(clusterNodes, connectionTimeout, soTimeout, maxAttempts, null, password, clientName,
poolConfig, ssl);
}

public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout,
int soTimeout, int maxAttempts, String user, String password, String clientName,
public JedisCluster(Set<HostAndPort> clusterNodes, int connectionTimeout, int soTimeout,
int maxAttempts, String user, String password, String clientName,
GenericObjectPoolConfig<Connection> poolConfig, boolean ssl) {
this(jedisClusterNode, DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
this(clusterNodes, DefaultJedisClientConfig.builder().connectionTimeoutMillis(connectionTimeout)
.socketTimeoutMillis(soTimeout).user(user).password(password).clientName(clientName).ssl(ssl).build(),
maxAttempts, poolConfig);
}

public JedisCluster(Set<HostAndPort> jedisClusterNode, JedisClientConfig clientConfig,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
int maxAttempts, GenericObjectPoolConfig<Connection> poolConfig) {
this(jedisClusterNode, clientConfig, maxAttempts,
this(clusterNodes, clientConfig, maxAttempts,
Duration.ofMillis((long) clientConfig.getSocketTimeoutMillis() * maxAttempts), poolConfig);
}

public JedisCluster(Set<HostAndPort> jedisClusterNode, JedisClientConfig clientConfig,
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
int maxAttempts, Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
super(jedisClusterNode, clientConfig, poolConfig, maxAttempts, maxTotalRetriesDuration);
super(clusterNodes, clientConfig, poolConfig, maxAttempts, maxTotalRetriesDuration);
}

public JedisCluster(Set<HostAndPort> jedisClusterNodes, JedisClientConfig clientConfig) {
this(jedisClusterNodes, clientConfig, DEFAULT_MAX_ATTEMPTS);
public JedisCluster(Set<HostAndPort> clusterNodess, JedisClientConfig clientConfig) {
this(clusterNodess, clientConfig, DEFAULT_MAX_ATTEMPTS);
}

public JedisCluster(Set<HostAndPort> jedisClusterNodes, JedisClientConfig clientConfig, int maxAttempts) {
super(jedisClusterNodes, clientConfig, maxAttempts);
public JedisCluster(Set<HostAndPort> clusterNodess, JedisClientConfig clientConfig, int maxAttempts) {
super(clusterNodess, clientConfig, maxAttempts);
}

public JedisCluster(Set<HostAndPort> jedisClusterNodes, JedisClientConfig clientConfig, int maxAttempts, Duration maxTotalRetriesDuration) {
super(jedisClusterNodes, clientConfig, maxAttempts, maxTotalRetriesDuration);
public JedisCluster(Set<HostAndPort> clusterNodess, JedisClientConfig clientConfig, int maxAttempts, Duration maxTotalRetriesDuration) {
super(clusterNodess, clientConfig, maxAttempts, maxTotalRetriesDuration);
}

public Map<String, ConnectionPool> getClusterNodes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
}

@Override
public final void close() {
public void close() {
sync();
for (Connection connection : connections.values()) {
connection.close();
Expand Down
27 changes: 26 additions & 1 deletion src/main/java/redis/clients/jedis/ShardedPipeline.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,47 @@
package redis.clients.jedis;

import redis.clients.jedis.providers.ShardedConnectionProvider;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.providers.ShardedConnectionProvider;
import redis.clients.jedis.util.Hashing;
import redis.clients.jedis.util.IOUtils;

public class ShardedPipeline extends MultiNodePipelineBase {

private final ShardedConnectionProvider provider;
private AutoCloseable closeable = null;

public ShardedPipeline(List<HostAndPort> shards, JedisClientConfig clientConfig) {
this(new ShardedConnectionProvider(shards, clientConfig));
this.closeable = this.provider;
}

public ShardedPipeline(ShardedConnectionProvider provider) {
super(new ShardedCommandObjects(provider.getHashingAlgo()));
this.provider = provider;
}

public ShardedPipeline(List<HostAndPort> shards, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Hashing algo, Pattern tagPattern) {
this(new ShardedConnectionProvider(shards, clientConfig, poolConfig, algo), tagPattern);
this.closeable = this.provider;
}

public ShardedPipeline(ShardedConnectionProvider provider, Pattern tagPattern) {
super(new ShardedCommandObjects(provider.getHashingAlgo(), tagPattern));
this.provider = provider;
}

@Override
public void close() {
try {
super.close();
} finally {
IOUtils.closeQuietly(closeable);
}
}

@Override
protected HostAndPort getNodeKey(CommandArguments args) {
return provider.getNode(((ShardedCommandArguments) args).getKeyHash());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ public class ClusterConnectionProvider implements ConnectionProvider {

protected final JedisClusterInfoCache cache;

public ClusterConnectionProvider(Set<HostAndPort> jedisClusterNodes, JedisClientConfig clientConfig) {
public ClusterConnectionProvider(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig) {
this.cache = new JedisClusterInfoCache(clientConfig);
initializeSlotsCache(jedisClusterNodes, clientConfig);
initializeSlotsCache(clusterNodes, clientConfig);
}

public ClusterConnectionProvider(Set<HostAndPort> jedisClusterNodes, JedisClientConfig clientConfig,
public ClusterConnectionProvider(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig) {
this.cache = new JedisClusterInfoCache(clientConfig, poolConfig);
initializeSlotsCache(jedisClusterNodes, clientConfig);
initializeSlotsCache(clusterNodes, clientConfig);
}

private void initializeSlotsCache(Set<HostAndPort> startNodes, JedisClientConfig clientConfig) {
Expand Down
Loading