25
25
import java .util .List ;
26
26
import java .util .Map ;
27
27
import java .util .concurrent .CompletableFuture ;
28
- import java .util .concurrent .ExecutionException ;
28
+ import java .util .concurrent .CompletionException ;
29
29
import java .util .concurrent .ExecutorService ;
30
30
import java .util .concurrent .Executors ;
31
31
import java .util .concurrent .Future ;
37
37
38
38
import io .netty .bootstrap .Bootstrap ;
39
39
import io .netty .channel .Channel ;
40
+ import io .netty .channel .ChannelDuplexHandler ;
41
+ import io .netty .channel .ChannelHandlerContext ;
40
42
import io .netty .channel .ChannelInitializer ;
41
43
import io .netty .channel .ChannelOption ;
42
44
import io .netty .channel .ChannelPipeline ;
58
60
import io .netty .handler .ssl .ClientAuth ;
59
61
import io .netty .handler .ssl .JdkSslContext ;
60
62
import io .netty .handler .stream .ChunkedWriteHandler ;
63
+ import io .netty .handler .timeout .IdleState ;
64
+ import io .netty .handler .timeout .IdleStateEvent ;
65
+ import io .netty .handler .timeout .IdleStateHandler ;
61
66
import io .netty .util .concurrent .GenericFutureListener ;
62
67
import org .glassfish .jersey .client .ClientProperties ;
63
68
import org .glassfish .jersey .client .ClientRequest ;
@@ -79,9 +84,30 @@ class NettyConnector implements Connector {
79
84
final Client client ;
80
85
final HashMap <String , ArrayList <Channel >> connections = new HashMap <>();
81
86
87
+ // If HTTP keepalive is enabled the value of "http.maxConnections" determines the maximum number
88
+ // of idle connections that will be simultaneously kept alive, per destination.
89
+ private static final String HTTP_KEEPALIVE_STRING = System .getProperty ("http.keepAlive" );
90
+ // http.keepalive (default: true)
91
+ private static final Boolean HTTP_KEEPALIVE =
92
+ HTTP_KEEPALIVE_STRING == null ? Boolean .TRUE : Boolean .parseBoolean (HTTP_KEEPALIVE_STRING );
93
+
94
+ // http.maxConnections (default: 5)
95
+ private static final int DEFAULT_MAX_POOL_SIZE = 5 ;
96
+ private static final int MAX_POOL_SIZE = Integer .getInteger ("http.maxConnections" , DEFAULT_MAX_POOL_SIZE );
97
+ private static final int MAX_POOL_IDLE = 60 ;
98
+
99
+ private final Integer maxPoolSize ; // either from system property, or from Jersey config, or default
100
+ private final Integer maxPoolIdle ; // either from Jersey config, or default
101
+
102
+ private static final String INACTIVE_POOLED_CONNECTION_HANDLER = "inactive_pooled_connection_handler" ;
103
+ private static final String PRUNE_INACTIVE_POOL = "prune_inactive_pool" ;
104
+ private static final String READ_TIMEOUT_HANDLER = "read_timeout_handler" ;
105
+ private static final String REQUEST_HANDLER = "request_handler" ;
106
+
82
107
NettyConnector (Client client ) {
83
108
84
- final Object threadPoolSize = client .getConfiguration ().getProperties ().get (ClientProperties .ASYNC_THREADPOOL_SIZE );
109
+ final Map <String , Object > properties = client .getConfiguration ().getProperties ();
110
+ final Object threadPoolSize = properties .get (ClientProperties .ASYNC_THREADPOOL_SIZE );
85
111
86
112
if (threadPoolSize != null && threadPoolSize instanceof Integer && (Integer ) threadPoolSize > 0 ) {
87
113
executorService = Executors .newFixedThreadPool ((Integer ) threadPoolSize );
@@ -92,20 +118,31 @@ class NettyConnector implements Connector {
92
118
}
93
119
94
120
this .client = client ;
121
+
122
+ final Object maxPoolIdleProperty = properties .get (NettyClientProperties .MAX_CONNECTIONS_TOTAL );
123
+ final Object maxPoolSizeProperty = properties .get (NettyClientProperties .MAX_CONNECTIONS );
124
+
125
+ maxPoolIdle = maxPoolIdleProperty != null ? (Integer ) maxPoolIdleProperty : MAX_POOL_IDLE ;
126
+ maxPoolSize = maxPoolSizeProperty != null
127
+ ? (Integer ) maxPoolSizeProperty
128
+ : (HTTP_KEEPALIVE ? MAX_POOL_SIZE : DEFAULT_MAX_POOL_SIZE );
129
+
130
+ if (maxPoolIdle == null || maxPoolIdle < 0 ) {
131
+ throw new ProcessingException (LocalizationMessages .WRONG_MAX_POOL_IDLE (maxPoolIdle ));
132
+ }
133
+
134
+ if (maxPoolSize == null || maxPoolSize < 0 ) {
135
+ throw new ProcessingException (LocalizationMessages .WRONG_MAX_POOL_SIZE (maxPoolIdle ));
136
+ }
95
137
}
96
138
97
139
@ Override
98
140
public ClientResponse apply (ClientRequest jerseyRequest ) {
99
141
try {
100
- CompletableFuture <ClientResponse > resultFuture = execute (jerseyRequest );
101
-
102
- Integer timeout = jerseyRequest .resolveProperty (ClientProperties .READ_TIMEOUT , 0 );
103
-
104
- return (timeout != null && timeout > 0 ) ? resultFuture .get (timeout , TimeUnit .MILLISECONDS )
105
- : resultFuture .get ();
106
- } catch (ExecutionException ex ) {
107
- Throwable e = ex .getCause () == null ? ex : ex .getCause ();
108
- throw new ProcessingException (e .getMessage (), e );
142
+ return execute (jerseyRequest ).join ();
143
+ } catch (CompletionException cex ) {
144
+ final Throwable t = cex .getCause () == null ? cex : cex .getCause ();
145
+ throw new ProcessingException (t .getMessage (), t );
109
146
} catch (Exception ex ) {
110
147
throw new ProcessingException (ex .getMessage (), ex );
111
148
}
@@ -120,6 +157,11 @@ public Future<?> apply(final ClientRequest jerseyRequest, final AsyncConnectorCa
120
157
}
121
158
122
159
protected CompletableFuture <ClientResponse > execute (final ClientRequest jerseyRequest ) {
160
+ Integer timeout = jerseyRequest .resolveProperty (ClientProperties .READ_TIMEOUT , 0 );
161
+ if (timeout == null || timeout < 0 ) {
162
+ throw new ProcessingException (LocalizationMessages .WRONG_READ_TIMEOUT (timeout ));
163
+ }
164
+
123
165
final CompletableFuture <ClientResponse > responseAvailable = new CompletableFuture <>();
124
166
final CompletableFuture <?> responseDone = new CompletableFuture <>();
125
167
@@ -128,6 +170,7 @@ protected CompletableFuture<ClientResponse> execute(final ClientRequest jerseyRe
128
170
int port = requestUri .getPort () != -1 ? requestUri .getPort () : "https" .equals (requestUri .getScheme ()) ? 443 : 80 ;
129
171
130
172
try {
173
+
131
174
String key = requestUri .getScheme () + "://" + host + ":" + port ;
132
175
ArrayList <Channel > conns ;
133
176
synchronized (connections ) {
@@ -138,9 +181,16 @@ protected CompletableFuture<ClientResponse> execute(final ClientRequest jerseyRe
138
181
}
139
182
}
140
183
141
- Channel chan ;
184
+ Channel chan = null ;
142
185
synchronized (conns ) {
143
- chan = conns .size () == 0 ? null : conns .remove (conns .size () - 1 );
186
+ while (chan == null && !conns .isEmpty ()) {
187
+ chan = conns .remove (conns .size () - 1 );
188
+ chan .pipeline ().remove (INACTIVE_POOLED_CONNECTION_HANDLER );
189
+ chan .pipeline ().remove (PRUNE_INACTIVE_POOL );
190
+ if (!chan .isOpen ()) {
191
+ chan = null ;
192
+ }
193
+ }
144
194
}
145
195
146
196
if (chan == null ) {
@@ -199,16 +249,30 @@ protected void initChannel(SocketChannel ch) throws Exception {
199
249
// will leak
200
250
final Channel ch = chan ;
201
251
JerseyClientHandler clientHandler = new JerseyClientHandler (jerseyRequest , responseAvailable , responseDone );
202
- ch .pipeline ().addLast (clientHandler );
252
+ // read timeout makes sense really as an inactivity timeout
253
+ ch .pipeline ().addLast (READ_TIMEOUT_HANDLER ,
254
+ new IdleStateHandler (0 , 0 , timeout , TimeUnit .MILLISECONDS ));
255
+ ch .pipeline ().addLast (REQUEST_HANDLER , clientHandler );
203
256
204
257
responseDone .whenComplete ((_r , th ) -> {
258
+ ch .pipeline ().remove (READ_TIMEOUT_HANDLER );
205
259
ch .pipeline ().remove (clientHandler );
206
260
207
261
if (th == null ) {
262
+ ch .pipeline ().addLast (INACTIVE_POOLED_CONNECTION_HANDLER , new IdleStateHandler (0 , 0 , maxPoolIdle ));
263
+ ch .pipeline ().addLast (PRUNE_INACTIVE_POOL , new PruneIdlePool (connections , key ));
208
264
synchronized (connections ) {
209
265
ArrayList <Channel > conns1 = connections .get (key );
210
- synchronized (conns1 ) {
266
+ if (conns1 == null ) {
267
+ conns1 = new ArrayList <>(1 );
211
268
conns1 .add (ch );
269
+ connections .put (key , conns1 );
270
+ } else {
271
+ synchronized (conns1 ) {
272
+ if (conns1 .size () < maxPoolSize ) {
273
+ conns1 .add (ch );
274
+ } // else do not add the Channel to the idle pool
275
+ }
212
276
}
213
277
}
214
278
} else {
@@ -331,4 +395,35 @@ private static URI getProxyUri(final Object proxy) {
331
395
throw new ProcessingException (LocalizationMessages .WRONG_PROXY_URI_TYPE (ClientProperties .PROXY_URI ));
332
396
}
333
397
}
398
+
399
+ protected static class PruneIdlePool extends ChannelDuplexHandler {
400
+ HashMap <String , ArrayList <Channel >> connections ;
401
+ String key ;
402
+
403
+ public PruneIdlePool (HashMap <String , ArrayList <Channel >> connections , String key ) {
404
+ this .connections = connections ;
405
+ this .key = key ;
406
+ }
407
+
408
+ @ Override
409
+ public void userEventTriggered (ChannelHandlerContext ctx , Object evt ) throws Exception {
410
+ if (evt instanceof IdleStateEvent ) {
411
+ IdleStateEvent e = (IdleStateEvent ) evt ;
412
+ if (e .state () == IdleState .ALL_IDLE ) {
413
+ ctx .close ();
414
+ synchronized (connections ) {
415
+ ArrayList <Channel > chans = connections .get (key );
416
+ synchronized (chans ) {
417
+ chans .remove (ctx .channel ());
418
+ if (chans .isEmpty ()) {
419
+ connections .remove (key );
420
+ }
421
+ }
422
+ }
423
+ }
424
+ } else {
425
+ super .userEventTriggered (ctx , evt );
426
+ }
427
+ }
428
+ }
334
429
}
0 commit comments