Skip to content
This repository has been archived by the owner on Jun 20, 2023. It is now read-only.

idle connection reaper #149

Merged
merged 4 commits into from
Sep 30, 2014
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.searchbox.client.config.ServerList;
import io.searchbox.client.config.discovery.NodeChecker;
import io.searchbox.client.config.exception.NoServerConfiguredException;
import io.searchbox.client.config.idle.IdleConnectionReaper;
import io.searchbox.client.util.PaddedAtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -25,12 +26,18 @@ public abstract class AbstractJestClient implements JestClient {
protected Gson gson = new GsonBuilder()
.setDateFormat(ELASTIC_SEARCH_DATE_FORMAT)
.create();

private NodeChecker nodeChecker;
private IdleConnectionReaper idleConnectionReaper;

public void setNodeChecker(NodeChecker nodeChecker) {
this.nodeChecker = nodeChecker;
}

public void setIdleConnectionReaper(IdleConnectionReaper idleConnectionReaper) {
this.idleConnectionReaper = idleConnectionReaper;
}

public LinkedHashSet<String> getServers() {
ServerList server = listOfServers.get();
if (server != null) return new LinkedHashSet<String>(server.getServers());
Expand All @@ -56,6 +63,10 @@ public void shutdownClient() {
nodeChecker.stopAsync();
nodeChecker.awaitTerminated();
}
if (null != idleConnectionReaper) {
idleConnectionReaper.stopAsync();
idleConnectionReaper.awaitTerminated();
}
}

protected String getElasticSearchServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class ClientConfig {
private int connTimeout;
private int readTimeout;
private TimeUnit discoveryFrequencyTimeUnit;
private long maxConnectionIdleTime;
private TimeUnit maxConnectionIdleTimeDurationTimeUnit;
private Gson gson;

private ClientConfig() {
Expand All @@ -35,6 +37,8 @@ public ClientConfig(AbstractBuilder builder) {
this.discoveryFrequencyTimeUnit = builder.discoveryFrequencyTimeUnit;
this.connTimeout = builder.connTimeout;
this.readTimeout = builder.readTimeout;
this.maxConnectionIdleTime = builder.maxConnectionIdleTime;
this.maxConnectionIdleTimeDurationTimeUnit = builder.maxConnectionIdleTimeDurationTimeUnit;
this.gson = builder.gson;
}

Expand Down Expand Up @@ -66,6 +70,14 @@ public int getConnTimeout() {
return connTimeout;
}

public long getMaxConnectionIdleTime() {
return maxConnectionIdleTime;
}

public TimeUnit getMaxConnectionIdleTimeDurationTimeUnit() {
return maxConnectionIdleTimeDurationTimeUnit;
}

public Gson getGson() {
return gson;
}
Expand Down Expand Up @@ -99,6 +111,8 @@ protected static abstract class AbstractBuilder<T extends ClientConfig, K extend
protected boolean isDiscoveryEnabled;
protected long discoveryFrequency = 10L;
protected TimeUnit discoveryFrequencyTimeUnit = TimeUnit.SECONDS;
protected long maxConnectionIdleTime = -1L;
protected TimeUnit maxConnectionIdleTimeDurationTimeUnit = TimeUnit.SECONDS;
protected Gson gson;

public AbstractBuilder(Collection<String> serverUris) {
Expand Down Expand Up @@ -171,6 +185,12 @@ public K defaultMaxTotalConnectionPerRoute(int defaultMaxTotalConnectionPerRoute
return (K) this;
}

public K maxConnectionIdleTime(long duration, TimeUnit maxConnectionIdleTimeDurationTimeUnit) {
this.maxConnectionIdleTime = duration;
this.maxConnectionIdleTimeDurationTimeUnit = maxConnectionIdleTimeDurationTimeUnit;
return (K) this;
}

abstract public T build();

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.searchbox.client.config.idle;

import com.google.common.util.concurrent.AbstractScheduledService;
import io.searchbox.client.config.ClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Used to reap idle connections from the connection manager.
*/
public class IdleConnectionReaper extends AbstractScheduledService {

final static Logger logger = LoggerFactory.getLogger(IdleConnectionReaper.class);

private final ReapableConnectionManager reapableConnectionManager;
private final ClientConfig clientConfig;

public IdleConnectionReaper(ClientConfig clientConfig, ReapableConnectionManager reapableConnectionManager) {
this.reapableConnectionManager = reapableConnectionManager;
this.clientConfig = clientConfig;
}

@Override
protected void runOneIteration() throws Exception {
logger.info("closing idle connections...");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will probably get too chatty for regular usage, won't it? I suggest changing the level to debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed (was following lead of NodeChecker).

reapableConnectionManager.closeIdleConnections(clientConfig.getMaxConnectionIdleTime(),
clientConfig.getMaxConnectionIdleTimeDurationTimeUnit());
}

@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0l,
clientConfig.getMaxConnectionIdleTime(),
clientConfig.getMaxConnectionIdleTimeDurationTimeUnit());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.searchbox.client.config.idle;

import java.util.concurrent.TimeUnit;

public interface ReapableConnectionManager {
void closeIdleConnections(long idleTimeout, TimeUnit unit);
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.searchbox.client.config;

import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import org.junit.Test;

/**
* @author Min Cha
*/
Expand All @@ -25,4 +27,21 @@ public void testTimeoutSettingsAsDefault() {
assertTrue(config.getConnTimeout() > 0);
assertTrue(config.getReadTimeout() > 0);
}

@Test
public void testDefaultMaxIdleConnectionTime() {
ClientConfig config = new ClientConfig.Builder("someUri").multiThreaded(true).build();

assertEquals(-1L, config.getMaxConnectionIdleTime());
assertEquals(TimeUnit.SECONDS, config.getMaxConnectionIdleTimeDurationTimeUnit());
}

@Test
public void testCustomMaxIdleConnectionTime() {
ClientConfig config = new ClientConfig.Builder("someUri").multiThreaded(true)
.maxConnectionIdleTime(30L, TimeUnit.MINUTES)
.build();
assertEquals(30L, config.getMaxConnectionIdleTime());
assertEquals(TimeUnit.MINUTES, config.getMaxConnectionIdleTimeDurationTimeUnit());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.searchly.jestdroid;

import ch.boye.httpclientandroidlib.impl.conn.PoolingClientConnectionManager;
import io.searchbox.client.config.idle.ReapableConnectionManager;

import java.util.concurrent.TimeUnit;

public class DroidReapableConnectionManager implements ReapableConnectionManager {

private final PoolingClientConnectionManager connectionManager;

public DroidReapableConnectionManager(PoolingClientConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}

@Override
public void closeIdleConnections(long idleTimeout, TimeUnit unit) {
connectionManager.closeIdleConnections(idleTimeout, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.gson.Gson;
import io.searchbox.client.JestClient;
import io.searchbox.client.config.discovery.NodeChecker;
import io.searchbox.client.config.idle.IdleConnectionReaper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,6 +49,17 @@ public JestClient getObject() {
}
httpclient = new DefaultHttpClient(cm);
log.debug("Multi Threaded http client created");

// schedule idle connection reaping if configured
if (droidClientConfig.getMaxConnectionIdleTime() > 0) {
log.info("Idle connection reaping enabled...");

IdleConnectionReaper reaper = new IdleConnectionReaper(droidClientConfig, new DroidReapableConnectionManager(cm));
client.setIdleConnectionReaper(reaper);
reaper.startAsync();
reaper.awaitRunning();
}

} else {
httpclient = new DefaultHttpClient();
log.debug("Default http client is created without multi threaded option");
Expand Down
Binary file not shown.
23 changes: 19 additions & 4 deletions jest/src/main/java/io/searchbox/client/JestClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.google.gson.Gson;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.client.config.discovery.NodeChecker;
import io.searchbox.client.config.idle.HttpReapableConnectionManager;
import io.searchbox.client.config.idle.IdleConnectionReaper;
import io.searchbox.client.http.JestHttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.HttpClientConnectionManager;
Expand Down Expand Up @@ -36,15 +38,16 @@ public JestClient getObject() {
if (httpClientConfig != null) {
log.debug("Creating HTTP client based on configuration");
client.setServers(httpClientConfig.getServerList());
client.setHttpClient(createHttpClient());
final HttpClientConnectionManager connectionManager = createConnectionManager();
client.setHttpClient(createHttpClient(connectionManager));

// set custom gson instance
Gson gson = httpClientConfig.getGson();
if (gson != null) {
client.setGson(gson);
}

//set discovery (should be set after setting the httpClient on jestClient)
// set discovery (should be set after setting the httpClient on jestClient)
if (httpClientConfig.isDiscoveryEnabled()) {
log.info("Node Discovery Enabled...");
NodeChecker nodeChecker = new NodeChecker(httpClientConfig, client);
Expand All @@ -54,6 +57,18 @@ public JestClient getObject() {
} else {
log.info("Node Discovery Disabled...");
}

// schedule idle connection reaping if configured
if (httpClientConfig.getMaxConnectionIdleTime() > 0) {
log.info("Idle connection reaping enabled...");

IdleConnectionReaper reaper = new IdleConnectionReaper(httpClientConfig, new HttpReapableConnectionManager(connectionManager));
client.setIdleConnectionReaper(reaper);
reaper.startAsync();
reaper.awaitRunning();
}


} else {
log.debug("There is no configuration to create http client. Going to create simple client with default values");
client.setHttpClient(HttpClients.createDefault());
Expand All @@ -66,9 +81,9 @@ public JestClient getObject() {
return client;
}

private CloseableHttpClient createHttpClient() {
private CloseableHttpClient createHttpClient(HttpClientConnectionManager connectionManager) {
return configureHttpClient(HttpClients.custom()
.setConnectionManager(createConnectionManager())
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(createRequestConfig()))
.setRoutePlanner(getRoutePlanner())
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.searchbox.client.config.idle;

import org.apache.http.conn.HttpClientConnectionManager;

import java.util.concurrent.TimeUnit;

public class HttpReapableConnectionManager implements ReapableConnectionManager {
private final HttpClientConnectionManager connectionManager;

public HttpReapableConnectionManager(HttpClientConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}

@Override
public void closeIdleConnections(long idleTimeout, TimeUnit unit) {
connectionManager.closeIdleConnections(idleTimeout, unit);
}
}
Loading