Skip to content
This repository has been archived by the owner on Jun 20, 2023. It is now read-only.

Commit

Permalink
Merge pull request #149 from matthewbogner/idle-connection-reaping
Browse files Browse the repository at this point in the history
New feature: Idle connection reaper - opt-in ability to clean idle connections in the pool periodically.
  • Loading branch information
Cihat Keser committed Sep 30, 2014
2 parents 1f30077 + f587e90 commit 2f428ff
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.searchbox.client.config.ServerList;
import io.searchbox.client.config.discovery.NodeChecker;
import io.searchbox.client.config.exception.NoServerConfiguredException;
import io.searchbox.client.config.idle.IdleConnectionReaper;
import io.searchbox.client.util.PaddedAtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -25,12 +26,18 @@ public abstract class AbstractJestClient implements JestClient {
protected Gson gson = new GsonBuilder()
.setDateFormat(ELASTIC_SEARCH_DATE_FORMAT)
.create();

private NodeChecker nodeChecker;
private IdleConnectionReaper idleConnectionReaper;

public void setNodeChecker(NodeChecker nodeChecker) {
this.nodeChecker = nodeChecker;
}

public void setIdleConnectionReaper(IdleConnectionReaper idleConnectionReaper) {
this.idleConnectionReaper = idleConnectionReaper;
}

public LinkedHashSet<String> getServers() {
ServerList server = listOfServers.get();
if (server != null) return new LinkedHashSet<String>(server.getServers());
Expand All @@ -56,6 +63,10 @@ public void shutdownClient() {
nodeChecker.stopAsync();
nodeChecker.awaitTerminated();
}
if (null != idleConnectionReaper) {
idleConnectionReaper.stopAsync();
idleConnectionReaper.awaitTerminated();
}
}

protected String getElasticSearchServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class ClientConfig {
private int connTimeout;
private int readTimeout;
private TimeUnit discoveryFrequencyTimeUnit;
private long maxConnectionIdleTime;
private TimeUnit maxConnectionIdleTimeDurationTimeUnit;
private Gson gson;

private ClientConfig() {
Expand All @@ -35,6 +37,8 @@ public ClientConfig(AbstractBuilder builder) {
this.discoveryFrequencyTimeUnit = builder.discoveryFrequencyTimeUnit;
this.connTimeout = builder.connTimeout;
this.readTimeout = builder.readTimeout;
this.maxConnectionIdleTime = builder.maxConnectionIdleTime;
this.maxConnectionIdleTimeDurationTimeUnit = builder.maxConnectionIdleTimeDurationTimeUnit;
this.gson = builder.gson;
}

Expand Down Expand Up @@ -66,6 +70,14 @@ public int getConnTimeout() {
return connTimeout;
}

public long getMaxConnectionIdleTime() {
return maxConnectionIdleTime;
}

public TimeUnit getMaxConnectionIdleTimeDurationTimeUnit() {
return maxConnectionIdleTimeDurationTimeUnit;
}

public Gson getGson() {
return gson;
}
Expand Down Expand Up @@ -99,6 +111,8 @@ protected static abstract class AbstractBuilder<T extends ClientConfig, K extend
protected boolean isDiscoveryEnabled;
protected long discoveryFrequency = 10L;
protected TimeUnit discoveryFrequencyTimeUnit = TimeUnit.SECONDS;
protected long maxConnectionIdleTime = -1L;
protected TimeUnit maxConnectionIdleTimeDurationTimeUnit = TimeUnit.SECONDS;
protected Gson gson;

public AbstractBuilder(Collection<String> serverUris) {
Expand Down Expand Up @@ -171,6 +185,12 @@ public K defaultMaxTotalConnectionPerRoute(int defaultMaxTotalConnectionPerRoute
return (K) this;
}

public K maxConnectionIdleTime(long duration, TimeUnit maxConnectionIdleTimeDurationTimeUnit) {
this.maxConnectionIdleTime = duration;
this.maxConnectionIdleTimeDurationTimeUnit = maxConnectionIdleTimeDurationTimeUnit;
return (K) this;
}

abstract public T build();

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.searchbox.client.config.idle;

import com.google.common.util.concurrent.AbstractScheduledService;
import io.searchbox.client.config.ClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Used to reap idle connections from the connection manager.
*/
public class IdleConnectionReaper extends AbstractScheduledService {

final static Logger logger = LoggerFactory.getLogger(IdleConnectionReaper.class);

private final ReapableConnectionManager reapableConnectionManager;
private final ClientConfig clientConfig;

public IdleConnectionReaper(ClientConfig clientConfig, ReapableConnectionManager reapableConnectionManager) {
this.reapableConnectionManager = reapableConnectionManager;
this.clientConfig = clientConfig;
}

@Override
protected void runOneIteration() throws Exception {
logger.debug("closing idle connections...");
reapableConnectionManager.closeIdleConnections(clientConfig.getMaxConnectionIdleTime(),
clientConfig.getMaxConnectionIdleTimeDurationTimeUnit());
}

@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0l,
clientConfig.getMaxConnectionIdleTime(),
clientConfig.getMaxConnectionIdleTimeDurationTimeUnit());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.searchbox.client.config.idle;

import java.util.concurrent.TimeUnit;

public interface ReapableConnectionManager {
void closeIdleConnections(long idleTimeout, TimeUnit unit);
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.searchbox.client.config;

import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import org.junit.Test;

/**
* @author Min Cha
*/
Expand All @@ -25,4 +27,21 @@ public void testTimeoutSettingsAsDefault() {
assertTrue(config.getConnTimeout() > 0);
assertTrue(config.getReadTimeout() > 0);
}

@Test
public void testDefaultMaxIdleConnectionTime() {
ClientConfig config = new ClientConfig.Builder("someUri").multiThreaded(true).build();

assertEquals(-1L, config.getMaxConnectionIdleTime());
assertEquals(TimeUnit.SECONDS, config.getMaxConnectionIdleTimeDurationTimeUnit());
}

@Test
public void testCustomMaxIdleConnectionTime() {
ClientConfig config = new ClientConfig.Builder("someUri").multiThreaded(true)
.maxConnectionIdleTime(30L, TimeUnit.MINUTES)
.build();
assertEquals(30L, config.getMaxConnectionIdleTime());
assertEquals(TimeUnit.MINUTES, config.getMaxConnectionIdleTimeDurationTimeUnit());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.searchly.jestdroid;

import ch.boye.httpclientandroidlib.impl.conn.PoolingClientConnectionManager;
import io.searchbox.client.config.idle.ReapableConnectionManager;

import java.util.concurrent.TimeUnit;

public class DroidReapableConnectionManager implements ReapableConnectionManager {

private final PoolingClientConnectionManager connectionManager;

public DroidReapableConnectionManager(PoolingClientConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}

@Override
public void closeIdleConnections(long idleTimeout, TimeUnit unit) {
connectionManager.closeIdleConnections(idleTimeout, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.gson.Gson;
import io.searchbox.client.JestClient;
import io.searchbox.client.config.discovery.NodeChecker;
import io.searchbox.client.config.idle.IdleConnectionReaper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,6 +49,17 @@ public JestClient getObject() {
}
httpclient = new DefaultHttpClient(cm);
log.debug("Multi Threaded http client created");

// schedule idle connection reaping if configured
if (droidClientConfig.getMaxConnectionIdleTime() > 0) {
log.info("Idle connection reaping enabled...");

IdleConnectionReaper reaper = new IdleConnectionReaper(droidClientConfig, new DroidReapableConnectionManager(cm));
client.setIdleConnectionReaper(reaper);
reaper.startAsync();
reaper.awaitRunning();
}

} else {
httpclient = new DefaultHttpClient();
log.debug("Default http client is created without multi threaded option");
Expand Down
Binary file not shown.
23 changes: 19 additions & 4 deletions jest/src/main/java/io/searchbox/client/JestClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.google.gson.Gson;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.client.config.discovery.NodeChecker;
import io.searchbox.client.config.idle.HttpReapableConnectionManager;
import io.searchbox.client.config.idle.IdleConnectionReaper;
import io.searchbox.client.http.JestHttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.HttpClientConnectionManager;
Expand Down Expand Up @@ -36,15 +38,16 @@ public JestClient getObject() {
if (httpClientConfig != null) {
log.debug("Creating HTTP client based on configuration");
client.setServers(httpClientConfig.getServerList());
client.setHttpClient(createHttpClient());
final HttpClientConnectionManager connectionManager = createConnectionManager();
client.setHttpClient(createHttpClient(connectionManager));

// set custom gson instance
Gson gson = httpClientConfig.getGson();
if (gson != null) {
client.setGson(gson);
}

//set discovery (should be set after setting the httpClient on jestClient)
// set discovery (should be set after setting the httpClient on jestClient)
if (httpClientConfig.isDiscoveryEnabled()) {
log.info("Node Discovery Enabled...");
NodeChecker nodeChecker = new NodeChecker(httpClientConfig, client);
Expand All @@ -54,6 +57,18 @@ public JestClient getObject() {
} else {
log.info("Node Discovery Disabled...");
}

// schedule idle connection reaping if configured
if (httpClientConfig.getMaxConnectionIdleTime() > 0) {
log.info("Idle connection reaping enabled...");

IdleConnectionReaper reaper = new IdleConnectionReaper(httpClientConfig, new HttpReapableConnectionManager(connectionManager));
client.setIdleConnectionReaper(reaper);
reaper.startAsync();
reaper.awaitRunning();
}


} else {
log.debug("There is no configuration to create http client. Going to create simple client with default values");
client.setHttpClient(HttpClients.createDefault());
Expand All @@ -66,9 +81,9 @@ public JestClient getObject() {
return client;
}

private CloseableHttpClient createHttpClient() {
private CloseableHttpClient createHttpClient(HttpClientConnectionManager connectionManager) {
return configureHttpClient(HttpClients.custom()
.setConnectionManager(createConnectionManager())
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(createRequestConfig()))
.setRoutePlanner(getRoutePlanner())
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.searchbox.client.config.idle;

import org.apache.http.conn.HttpClientConnectionManager;

import java.util.concurrent.TimeUnit;

public class HttpReapableConnectionManager implements ReapableConnectionManager {
private final HttpClientConnectionManager connectionManager;

public HttpReapableConnectionManager(HttpClientConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}

@Override
public void closeIdleConnections(long idleTimeout, TimeUnit unit) {
connectionManager.closeIdleConnections(idleTimeout, unit);
}
}
Loading

0 comments on commit 2f428ff

Please # to comment.