Skip to content

Commit 51abc78

Browse files
authored
Fix issues with ChunkedInputStream when using Apache Connector (#4338)
* Set Apache Connector behaviour for Apache Http Client prior 4.5.1 to behaviour in Jersey 2.28 Keep behaviour of Jersey 2.29 for Apache HttpClient 4.5.1+ Signed-off-by: Jan Supol <jan.supol@oracle.com>
1 parent e59b36e commit 51abc78

File tree

7 files changed

+385
-35
lines changed

7 files changed

+385
-35
lines changed

connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheClientProperties.java

+9
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,15 @@ public final class ApacheClientProperties {
156156
*/
157157
public static final String KEEPALIVE_STRATEGY = "jersey.config.apache.client.keepAliveStrategy";
158158

159+
160+
/**
161+
* Strategy that closes the Apache Connection. Accepts an instance of {@link ApacheConnectionClosingStrategy}.
162+
*
163+
* @see ApacheConnectionClosingStrategy
164+
* @since 2.30
165+
*/
166+
public static final String CONNECTION_CLOSING_STRATEGY = "jersey.config.apache.client.connectionClosingStrategy";
167+
159168
/**
160169
* Get the value of the specified property.
161170
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0, which is available at
6+
* http://www.eclipse.org/legal/epl-2.0.
7+
*
8+
* This Source Code may also be made available under the following Secondary
9+
* Licenses when the conditions for such availability set forth in the
10+
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11+
* version 2 with the GNU Classpath Exception, which is available at
12+
* https://www.gnu.org/software/classpath/license.html.
13+
*
14+
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15+
*/
16+
17+
package org.glassfish.jersey.apache.connector;
18+
19+
import org.apache.http.client.methods.CloseableHttpResponse;
20+
import org.apache.http.client.methods.HttpUriRequest;
21+
import org.glassfish.jersey.client.ClientRequest;
22+
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
26+
/**
27+
* /**
28+
* Strategy that defines the way the Apache client releases resources. The client enables closing the content stream
29+
* and the response. From the Apache documentation:
30+
* <pre>
31+
* The difference between closing the content stream and closing the response is that
32+
* the former will attempt to keep the underlying connection alive by consuming the
33+
* entity content while the latter immediately shuts down and discards the connection.
34+
* </pre>
35+
* With Apache Client before 4.5.1, it was ok to close the response and the content stream. This is the default for
36+
* Apache Client 4.5 and older.
37+
* <p/>
38+
* For Apache Client 4.5.1+, first the content stream and the response is should be closed.
39+
* <p/>
40+
* In the case of Chunk content stream, the stream is not closed on the server side, and the client can hung on reading
41+
* the closing chunk. Using the {@link org.glassfish.jersey.client.ClientProperties#READ_TIMEOUT} property can prevent
42+
* this hanging forever and the reading of the closing chunk is terminated when the time is out. The other option, when
43+
* the timeout is not set, is to abort the Apache client request. This is the default for Apache Client 4.5.1+ when the
44+
* read timeout is not set.
45+
* <p/>
46+
* Another option is not to close the content stream, which is possible by the Apache client documentation. In this case,
47+
* however, the server side may not be notified and would not not close its chunk stream.
48+
*/
49+
public interface ApacheConnectionClosingStrategy {
50+
/**
51+
* Method to close the connection.
52+
* @param clientRequest The {@link ClientRequest} to get {@link ClientRequest#getConfiguration() configuration},
53+
* and {@link ClientRequest#resolveProperty(String, Class) resolve properties}.
54+
* @param request Apache {@code HttpUriRequest} that can be {@code abort}ed.
55+
* @param response Apache {@code CloseableHttpResponse} that can be {@code close}d.
56+
* @param stream The entity stream that can be {@link InputStream#close() closed}.
57+
* @throws IOException In case of some of the closing methods throws {@link IOException}
58+
*/
59+
void close(ClientRequest clientRequest, HttpUriRequest request, CloseableHttpResponse response, InputStream stream)
60+
throws IOException;
61+
62+
/**
63+
* Strategy that aborts Apache HttpRequests for the case of Chunked Stream, closes the stream, and response next.
64+
*/
65+
class GracefulClosingStrategy implements ApacheConnectionClosingStrategy {
66+
static final GracefulClosingStrategy INSTANCE = new GracefulClosingStrategy();
67+
68+
@Override
69+
public void close(ClientRequest clientRequest, HttpUriRequest request, CloseableHttpResponse response, InputStream stream)
70+
throws IOException {
71+
if (response.getEntity() != null && response.getEntity().isChunked()) {
72+
request.abort();
73+
}
74+
try {
75+
stream.close();
76+
} catch (IOException ex) {
77+
// Ignore
78+
} finally {
79+
response.close();
80+
}
81+
}
82+
}
83+
84+
/**
85+
* Strategy that closes the response and content stream next. This is a behaviour of Jersey 2.28.
86+
*/
87+
class ImmediateClosingStrategy implements ApacheConnectionClosingStrategy {
88+
static final ImmediateClosingStrategy INSTANCE = new ImmediateClosingStrategy();
89+
90+
@Override
91+
public void close(ClientRequest clientRequest, HttpUriRequest request, CloseableHttpResponse response, InputStream stream)
92+
throws IOException {
93+
response.close();
94+
stream.close();
95+
}
96+
}
97+
}

connectors/apache-connector/src/main/java/org/glassfish/jersey/apache/connector/ApacheConnector.java

+63-21
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,15 @@
2626
import java.io.OutputStream;
2727
import java.net.URI;
2828
import java.util.ArrayList;
29+
import java.util.LinkedList;
2930
import java.util.List;
3031
import java.util.Map;
31-
import java.util.Optional;
3232
import java.util.concurrent.CompletableFuture;
3333
import java.util.concurrent.Future;
3434
import java.util.concurrent.atomic.AtomicLong;
3535
import java.util.logging.Level;
3636
import java.util.logging.Logger;
37+
import java.util.stream.Collectors;
3738

3839
import javax.ws.rs.ProcessingException;
3940
import javax.ws.rs.client.Client;
@@ -173,7 +174,6 @@
173174
class ApacheConnector implements Connector {
174175

175176
private static final Logger LOGGER = Logger.getLogger(ApacheConnector.class.getName());
176-
177177
private static final VersionInfo vi;
178178
private static final String release;
179179

@@ -325,14 +325,16 @@ class ApacheConnector implements Connector {
325325
}
326326
clientBuilder.setDefaultRequestConfig(requestConfig);
327327

328-
Optional<Object> contract = config.getInstances().stream()
329-
.filter(a -> ApacheHttpClientBuilderConfigurator.class.isInstance(a)).findFirst();
328+
LinkedList<Object> contracts = config.getInstances().stream()
329+
.filter(ApacheHttpClientBuilderConfigurator.class::isInstance)
330+
.collect(Collectors.toCollection(LinkedList::new));
330331

331-
final HttpClientBuilder configuredBuilder = contract.isPresent()
332-
? ((ApacheHttpClientBuilderConfigurator) contract.get()).configure(clientBuilder)
333-
: null;
332+
HttpClientBuilder configuredBuilder = clientBuilder;
333+
for (Object configurator : contracts) {
334+
configuredBuilder = ((ApacheHttpClientBuilderConfigurator) configurator).configure(configuredBuilder);
335+
}
334336

335-
this.client = configuredBuilder != null ? configuredBuilder.build() : clientBuilder.build();
337+
this.client = configuredBuilder.build();
336338
}
337339

338340
private HttpClientConnectionManager getConnectionManager(final Client client,
@@ -515,7 +517,8 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing
515517
}
516518

517519
try {
518-
responseContext.setEntityStream(getInputStream(response));
520+
final ConnectionClosingMechanism closingMechanism = new ConnectionClosingMechanism(clientRequest, request);
521+
responseContext.setEntityStream(getInputStream(response, closingMechanism));
519522
} catch (final IOException e) {
520523
LOGGER.log(Level.SEVERE, null, e);
521524
}
@@ -655,8 +658,8 @@ private static Map<String, String> writeOutBoundHeaders(final ClientRequest clie
655658
return stringHeaders;
656659
}
657660

658-
private static InputStream getInputStream(final CloseableHttpResponse response) throws IOException {
659-
661+
private static InputStream getInputStream(final CloseableHttpResponse response,
662+
final ConnectionClosingMechanism closingMechanism) throws IOException {
660663
final InputStream inputStream;
661664

662665
if (response.getEntity() == null) {
@@ -670,18 +673,57 @@ private static InputStream getInputStream(final CloseableHttpResponse response)
670673
}
671674
}
672675

673-
return new FilterInputStream(inputStream) {
674-
@Override
675-
public void close() throws IOException {
676-
try {
677-
super.close();
678-
} catch (IOException ex) {
679-
// Ignore
680-
} finally {
681-
response.close();
676+
return closingMechanism.getEntityStream(inputStream, response);
677+
}
678+
679+
/**
680+
* The way the Apache CloseableHttpResponse is to be closed.
681+
* See https://github.com/eclipse-ee4j/jersey/issues/4321
682+
* {@link ApacheClientProperties#CONNECTION_CLOSING_STRATEGY}
683+
*/
684+
private final class ConnectionClosingMechanism {
685+
private ApacheConnectionClosingStrategy connectionClosingStrategy = null;
686+
private final ClientRequest clientRequest;
687+
private final HttpUriRequest apacheRequest;
688+
689+
private ConnectionClosingMechanism(ClientRequest clientRequest, HttpUriRequest apacheRequest) {
690+
this.clientRequest = clientRequest;
691+
this.apacheRequest = apacheRequest;
692+
Object closingStrategyProperty = clientRequest
693+
.resolveProperty(ApacheClientProperties.CONNECTION_CLOSING_STRATEGY, Object.class);
694+
if (closingStrategyProperty != null) {
695+
if (ApacheConnectionClosingStrategy.class.isInstance(closingStrategyProperty)) {
696+
connectionClosingStrategy = (ApacheConnectionClosingStrategy) closingStrategyProperty;
697+
} else {
698+
LOGGER.log(
699+
Level.WARNING,
700+
LocalizationMessages.IGNORING_VALUE_OF_PROPERTY(
701+
ApacheClientProperties.CONNECTION_CLOSING_STRATEGY,
702+
closingStrategyProperty,
703+
ApacheConnectionClosingStrategy.class.getName())
704+
);
682705
}
683706
}
684-
};
707+
708+
if (connectionClosingStrategy == null) {
709+
if (vi.getRelease().compareTo("4.5") > 0) {
710+
connectionClosingStrategy = ApacheConnectionClosingStrategy.GracefulClosingStrategy.INSTANCE;
711+
} else {
712+
connectionClosingStrategy = ApacheConnectionClosingStrategy.ImmediateClosingStrategy.INSTANCE;
713+
}
714+
}
715+
}
716+
717+
private InputStream getEntityStream(final InputStream inputStream,
718+
final CloseableHttpResponse response) {
719+
InputStream filterStream = new FilterInputStream(inputStream) {
720+
@Override
721+
public void close() throws IOException {
722+
connectionClosingStrategy.close(clientRequest, apacheRequest, response, in);
723+
}
724+
};
725+
return filterStream;
726+
}
685727
}
686728

687729
private static class ConnectionFactory extends ManagedHttpClientConnectionFactory {

connectors/apache-connector/src/test/java/org/glassfish/jersey/apache/connector/StreamingTest.java

+45-14
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
import java.io.IOException;
2020
import java.io.InputStream;
21+
import java.util.concurrent.atomic.AtomicInteger;
2122

2223
import javax.ws.rs.GET;
2324
import javax.ws.rs.Path;
2425
import javax.ws.rs.Produces;
26+
import javax.ws.rs.client.Invocation;
2527
import javax.ws.rs.client.WebTarget;
2628
import javax.ws.rs.core.Application;
2729
import javax.ws.rs.core.MediaType;
@@ -49,21 +51,13 @@ public class StreamingTest extends JerseyTest {
4951
* Test that a data stream can be terminated from the client side.
5052
*/
5153
@Test
52-
public void clientCloseTest() throws IOException {
53-
// start streaming
54-
InputStream inputStream = target().path("/streamingEndpoint").request()
55-
.property(ClientProperties.READ_TIMEOUT, 1_000).get(InputStream.class);
54+
public void clientCloseNoTimeoutTest() throws IOException {
55+
clientCloseTest(-1);
56+
}
5657

57-
WebTarget sendTarget = target().path("/streamingEndpoint/send");
58-
// trigger sending 'A' to the stream; OK is sent if everything on the server was OK
59-
assertEquals("OK", sendTarget.request().get().readEntity(String.class));
60-
// check 'A' has been sent
61-
assertEquals('A', inputStream.read());
62-
// closing the stream should tear down the connection
63-
inputStream.close();
64-
// trigger sending another 'A' to the stream; it should fail
65-
// (indicating that the streaming has been terminated on the server)
66-
assertEquals("NOK", sendTarget.request().get().readEntity(String.class));
58+
@Test
59+
public void clientCloseWithTimeOutTest() throws IOException {
60+
clientCloseTest(1_000);
6761
}
6862

6963
/**
@@ -103,6 +97,43 @@ protected Application configure() {
10397
return new ResourceConfig(StreamingEndpoint.class);
10498
}
10599

100+
/**
101+
* Test that a data stream can be terminated from the client side.
102+
*/
103+
private void clientCloseTest(int readTimeout) throws IOException {
104+
// start streaming
105+
AtomicInteger counter = new AtomicInteger(0);
106+
Invocation.Builder builder = target().path("/streamingEndpoint").request();
107+
if (readTimeout > -1) {
108+
counter.set(1);
109+
builder.property(ClientProperties.READ_TIMEOUT, readTimeout);
110+
builder.property(ApacheClientProperties.CONNECTION_CLOSING_STRATEGY,
111+
(ApacheConnectionClosingStrategy) (config, request, response, stream) -> {
112+
try {
113+
stream.close();
114+
} catch (Exception e) {
115+
// timeout, no chunk ending
116+
} finally {
117+
counter.set(0);
118+
response.close();
119+
}
120+
});
121+
}
122+
InputStream inputStream = builder.get(InputStream.class);
123+
124+
WebTarget sendTarget = target().path("/streamingEndpoint/send");
125+
// trigger sending 'A' to the stream; OK is sent if everything on the server was OK
126+
assertEquals("OK", sendTarget.request().get().readEntity(String.class));
127+
// check 'A' has been sent
128+
assertEquals('A', inputStream.read());
129+
// closing the stream should tear down the connection
130+
inputStream.close();
131+
// trigger sending another 'A' to the stream; it should fail
132+
// (indicating that the streaming has been terminated on the server)
133+
assertEquals("NOK", sendTarget.request().get().readEntity(String.class));
134+
assertEquals(0, counter.get());
135+
}
136+
106137
@Singleton
107138
@Path("streamingEndpoint")
108139
public static class StreamingEndpoint {

0 commit comments

Comments
 (0)