Skip to content

Commit

Permalink
(feat) Adds more constructors for AWSElasticCacheClient
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Nov 3, 2016
1 parent 6fb489e commit 4af068c
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 15 deletions.
24 changes: 22 additions & 2 deletions src/main/java/net/rubyeye/xmemcached/XMemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,9 @@ public XMemcachedClient(final InetSocketAddress inetSocketAddress,
throw new IllegalArgumentException("Null InetSocketAddress");

}
if (cmdFactory == null) {
throw new IllegalArgumentException("Null command factory.");
}
if (weight <= 0) {
throw new IllegalArgumentException("weight<=0");
}
Expand Down Expand Up @@ -1002,16 +1005,33 @@ private final boolean isWindowsPlatform() {
*/
public XMemcachedClient(List<InetSocketAddress> addressList)
throws IOException {
this(addressList, new TextCommandFactory());
}

/**
* XMemcached Constructor.Every server's weight is one by default.
*
* @param cmdFactory
* command factory
* @param addressList
* memcached server socket address list.
* @throws IOException
*/
public XMemcachedClient(List<InetSocketAddress> addressList,
CommandFactory cmdFactory) throws IOException {
super();
if (cmdFactory == null) {
throw new IllegalArgumentException("Null command factory.");
}
if (addressList == null || addressList.isEmpty()) {
throw new IllegalArgumentException("Empty address list");
}
BufferAllocator simpleBufferAllocator = new SimpleBufferAllocator();
this.buildConnector(new ArrayMemcachedSessionLocator(),
simpleBufferAllocator,
XMemcachedClientBuilder.getDefaultConfiguration(),
XMemcachedClientBuilder.getDefaultSocketOptions(),
new TextCommandFactory(), new SerializingTranscoder());
XMemcachedClientBuilder.getDefaultSocketOptions(), cmdFactory,
new SerializingTranscoder());
this.start0();
for (InetSocketAddress inetSocketAddress : addressList) {
this.connect(new InetSocketAddressWrapper(inetSocketAddress,
Expand Down
55 changes: 42 additions & 13 deletions src/main/java/net/rubyeye/xmemcached/aws/AWSElasticCacheClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ public class AWSElasticCacheClient extends XMemcachedClient implements

private boolean firstTimeUpdate = true;

private InetSocketAddress configAddr;
private List<InetSocketAddress> configAddrs = new ArrayList<InetSocketAddress>();

public synchronized void onUpdate(ClusterConfigration config) {

if (firstTimeUpdate) {
firstTimeUpdate = false;
removeConfigAddr();
removeConfigAddrs();
}

List<CacheNode> oldList = this.currentClusterConfiguration != null ? this.currentClusterConfiguration
Expand Down Expand Up @@ -79,14 +79,17 @@ public synchronized void onUpdate(ClusterConfigration config) {
this.currentClusterConfiguration = config;
}

private void removeConfigAddr() {
this.removeAddr(configAddr);
while (this.getConnector().getSessionByAddress(configAddr) != null
&& this.getConnector().getSessionByAddress(configAddr).size() > 0) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
private void removeConfigAddrs() {
for (InetSocketAddress configAddr : this.configAddrs) {
this.removeAddr(configAddr);
while (this.getConnector().getSessionByAddress(configAddr) != null
&& this.getConnector().getSessionByAddress(configAddr)
.size() > 0) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Expand All @@ -105,21 +108,47 @@ public AWSElasticCacheClient(InetSocketAddress addr,
}

public AWSElasticCacheClient(InetSocketAddress addr,
long pollConfigIntervalMills, CommandFactory cmdFactory)
throws IOException {
this(asList(addr), pollConfigIntervalMills, cmdFactory);
}

private static List<InetSocketAddress> asList(InetSocketAddress addr) {
List<InetSocketAddress> addrs = new ArrayList<InetSocketAddress>();
addrs.add(addr);
return addrs;
}

public AWSElasticCacheClient(List<InetSocketAddress> addrs)
throws IOException {
this(addrs, DEFAULT_POLL_CONFIG_INTERVAL_MS);
}

public AWSElasticCacheClient(List<InetSocketAddress> addrs,
long pollConfigIntervalMills) throws IOException {
this(addrs, pollConfigIntervalMills, new TextCommandFactory());
}

public AWSElasticCacheClient(List<InetSocketAddress> addrs,
long pollConfigIntervalMills, CommandFactory commandFactory)
throws IOException {
super(addr, 1, commandFactory);
super(addrs, commandFactory);
if (pollConfigIntervalMills <= 0) {
throw new IllegalArgumentException(
"Invalid pollConfigIntervalMills value.");
}
// Use failure mode by default.
this.commandFactory = commandFactory;
this.setFailureMode(true);
this.configAddr = addr;
this.configAddrs = addrs;
this.configPoller = new ConfigurationPoller(this,
pollConfigIntervalMills);
// Run at once to get config at startup.
// It will call onUpdate in the same thread.
this.configPoller.run();
if (this.currentClusterConfiguration == null) {
throw new IllegalStateException(
"Retrieve ElasticCache config from `" + addr.toString()
"Retrieve ElasticCache config from `" + addrs.toString()
+ "` failed.");
}
this.configPoller.start();
Expand Down

0 comments on commit 4af068c

Please # to comment.