Skip to content

Commit 30dbb3e

Browse files
010gvrVenkat
authored and
Venkat
committedNov 6, 2019
Rewritten Netty Jersey implementation using direct ByteBuf consumption
Signed-off-by: Venkat Ganesh <010gvr@gmail.com>
1 parent 72c27bc commit 30dbb3e

File tree

6 files changed

+132
-141
lines changed

6 files changed

+132
-141
lines changed
 

‎connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java

+8-13
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.glassfish.jersey.netty.connector;
1818

19-
import java.io.ByteArrayInputStream;
2019
import java.io.IOException;
2120
import java.io.InputStream;
2221
import java.util.Map;
@@ -31,6 +30,7 @@
3130
import org.glassfish.jersey.netty.connector.internal.NettyInputStream;
3231

3332
import io.netty.buffer.ByteBuf;
33+
import io.netty.buffer.Unpooled;
3434
import io.netty.channel.ChannelHandlerContext;
3535
import io.netty.channel.SimpleChannelInboundHandler;
3636
import io.netty.handler.codec.http.HttpContent;
@@ -50,7 +50,7 @@
5050
class JerseyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
5151

5252
private final NettyConnector connector;
53-
private final LinkedBlockingDeque<InputStream> isList = new LinkedBlockingDeque<>();
53+
private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
5454

5555
private final AsyncConnectorCallback asyncConnectorCallback;
5656
private final ClientRequest jerseyRequest;
@@ -89,15 +89,15 @@ public String getReasonPhrase() {
8989
for (Map.Entry<String, String> entry : response.headers().entries()) {
9090
jerseyResponse.getHeaders().add(entry.getKey(), entry.getValue());
9191
}
92-
92+
isList.clear(); // clearing the content - possible leftover from previous request processing.
9393
// request entity handling.
9494
if ((response.headers().contains(HttpHeaderNames.CONTENT_LENGTH) && HttpUtil.getContentLength(response) > 0)
9595
|| HttpUtil.isTransferEncodingChunked(response)) {
9696

9797
ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
9898
@Override
9999
public void operationComplete(Future<? super Void> future) throws Exception {
100-
isList.add(NettyInputStream.END_OF_INPUT_ERROR);
100+
isList.add(Unpooled.EMPTY_BUFFER);
101101
}
102102
});
103103

@@ -123,21 +123,16 @@ public void run() {
123123

124124
}
125125
if (msg instanceof HttpContent) {
126-
127126
HttpContent httpContent = (HttpContent) msg;
128127

129128
ByteBuf content = httpContent.content();
130-
131129
if (content.isReadable()) {
132-
// copy bytes - when netty reads last chunk, it automatically closes the channel, which invalidates all
133-
// relates ByteBuffs.
134-
byte[] bytes = new byte[content.readableBytes()];
135-
content.getBytes(content.readerIndex(), bytes);
136-
isList.add(new ByteArrayInputStream(bytes));
130+
content.retain();
131+
isList.add(content);
137132
}
138133

139134
if (msg instanceof LastHttpContent) {
140-
isList.add(NettyInputStream.END_OF_INPUT);
135+
isList.add(Unpooled.EMPTY_BUFFER);
141136
}
142137
}
143138
}
@@ -153,6 +148,6 @@ public void run() {
153148
});
154149
}
155150
future.completeExceptionally(cause);
156-
isList.add(NettyInputStream.END_OF_INPUT_ERROR);
151+
ctx.close();
157152
}
158153
}

‎connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/JerseyChunkedInput.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,9 @@
4242
public class JerseyChunkedInput extends OutputStream implements ChunkedInput<ByteBuf>, ChannelFutureListener {
4343

4444
private static final ByteBuffer VOID = ByteBuffer.allocate(0);
45-
private static final int CAPACITY = 8;
46-
// TODO this needs to be configurable, see JERSEY-3228
47-
private static final int WRITE_TIMEOUT = 10000;
48-
private static final int READ_TIMEOUT = 10000;
45+
private static final int CAPACITY = Integer.getInteger("jersey.ci.capacity", 8);
46+
private static final int WRITE_TIMEOUT = Integer.getInteger("jersey.ci.read.timeout", 10000);
47+
private static final int READ_TIMEOUT = Integer.getInteger("jersey.ci.write.timeout", 10000);
4948

5049
private final LinkedBlockingDeque<ByteBuffer> queue = new LinkedBlockingDeque<>(CAPACITY);
5150
private final Channel ctx;

‎connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java

+61-71
Original file line numberDiff line numberDiff line change
@@ -20,77 +20,47 @@
2020
import java.io.InputStream;
2121
import java.util.concurrent.LinkedBlockingDeque;
2222

23+
import io.netty.buffer.ByteBuf;
24+
import io.netty.buffer.Unpooled;
25+
2326
/**
2427
* Input stream which servers as Request entity input.
2528
* <p>
26-
* Converts Netty NIO buffers to an input streams and stores them in the queue,
27-
* waiting for Jersey to process it.
28-
*
29-
* @author Pavel Bucek
29+
* Consumes a list of pending {@link ByteBuf}s and processes them on request by Jersey
3030
*/
3131
public class NettyInputStream extends InputStream {
3232

33-
private volatile boolean end = false;
34-
35-
/**
36-
* End of input.
37-
*/
38-
public static final InputStream END_OF_INPUT = new InputStream() {
39-
@Override
40-
public int read() throws IOException {
41-
return 0;
42-
}
43-
44-
@Override
45-
public String toString() {
46-
return "END_OF_INPUT " + super.toString();
47-
}
48-
};
49-
50-
/**
51-
* Unexpected end of input.
52-
*/
53-
public static final InputStream END_OF_INPUT_ERROR = new InputStream() {
54-
@Override
55-
public int read() throws IOException {
56-
return 0;
57-
}
58-
59-
@Override
60-
public String toString() {
61-
return "END_OF_INPUT_ERROR " + super.toString();
62-
}
63-
};
64-
65-
private final LinkedBlockingDeque<InputStream> isList;
33+
private final LinkedBlockingDeque<ByteBuf> isList;
6634

67-
public NettyInputStream(LinkedBlockingDeque<InputStream> isList) {
35+
public NettyInputStream(LinkedBlockingDeque<ByteBuf> isList) {
6836
this.isList = isList;
6937
}
7038

71-
private interface ISReader {
72-
int readFrom(InputStream take) throws IOException;
73-
}
74-
75-
private int readInternal(ISReader isReader) throws IOException {
76-
if (end) {
77-
return -1;
78-
}
39+
@Override
40+
public int read(byte[] b, int off, int len) throws IOException {
7941

80-
InputStream take;
42+
ByteBuf take;
8143
try {
8244
take = isList.take();
83-
84-
if (checkEndOfInput(take)) {
45+
boolean isReadable = take.isReadable();
46+
int read = -1;
47+
if (checkEndOfInputOrError(take)) {
48+
take.release();
8549
return -1;
8650
}
8751

88-
int read = isReader.readFrom(take);
89-
90-
if (take.available() > 0) {
91-
isList.addFirst(take);
52+
if (isReadable) {
53+
int readableBytes = take.readableBytes();
54+
read = Math.min(readableBytes, len);
55+
take.readBytes(b, off, read);
56+
if (read < len) {
57+
take.release();
58+
} else {
59+
isList.addFirst(take);
60+
}
9261
} else {
93-
take.close();
62+
read = 0;
63+
take.release(); //We don't need `0`
9464
}
9565

9666
return read;
@@ -100,33 +70,53 @@ private int readInternal(ISReader isReader) throws IOException {
10070
}
10171

10272
@Override
103-
public int read(byte[] b, int off, int len) throws IOException {
104-
return readInternal(take -> take.read(b, off, len));
73+
public int read() throws IOException {
74+
75+
ByteBuf take;
76+
try {
77+
take = isList.take();
78+
boolean isReadable = take.isReadable();
79+
if (checkEndOfInputOrError(take)) {
80+
take.release();
81+
return -1;
82+
}
83+
84+
if (isReadable) {
85+
return take.readInt();
86+
} else {
87+
take.release(); //We don't need `0`
88+
}
89+
90+
return 0;
91+
} catch (InterruptedException e) {
92+
throw new IOException("Interrupted.", e);
93+
}
10594
}
10695

10796
@Override
108-
public int read() throws IOException {
109-
return readInternal(InputStream::read);
97+
public void close() throws IOException {
98+
if (isList != null) {
99+
while (!isList.isEmpty()) {
100+
try {
101+
isList.take().release();
102+
} catch (InterruptedException e) {
103+
throw new IOException("Interrupted. Potential ByteBuf Leak.", e);
104+
}
105+
}
106+
}
107+
super.close();
110108
}
111109

112110
@Override
113111
public int available() throws IOException {
114-
InputStream peek = isList.peek();
115-
if (peek != null) {
116-
return peek.available();
112+
ByteBuf peek = isList.peek();
113+
if (peek != null && peek.isReadable()) {
114+
return peek.readableBytes();
117115
}
118-
119116
return 0;
120117
}
121118

122-
private boolean checkEndOfInput(InputStream take) throws IOException {
123-
if (take == END_OF_INPUT) {
124-
end = true;
125-
return true;
126-
} else if (take == END_OF_INPUT_ERROR) {
127-
end = true;
128-
throw new IOException("Connection was closed prematurely.");
129-
}
130-
return false;
119+
private boolean checkEndOfInputOrError(ByteBuf take) throws IOException {
120+
return take == Unpooled.EMPTY_BUFFER;
131121
}
132122
}

‎containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import java.util.concurrent.LinkedBlockingDeque;
2929

3030
import javax.ws.rs.core.SecurityContext;
31-
32-
import io.netty.buffer.ByteBufInputStream;
31+
import io.netty.buffer.ByteBuf;
32+
import io.netty.buffer.Unpooled;
3333
import io.netty.channel.ChannelDuplexHandler;
3434
import io.netty.channel.ChannelHandler;
3535
import io.netty.channel.ChannelHandlerContext;
@@ -55,7 +55,7 @@
5555
class JerseyHttp2ServerHandler extends ChannelDuplexHandler {
5656

5757
private final URI baseUri;
58-
private final LinkedBlockingDeque<InputStream> isList = new LinkedBlockingDeque<>();
58+
private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
5959
private final NettyHttpContainer container;
6060
private final ResourceConfig resourceConfig;
6161

@@ -92,9 +92,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
9292
* Process incoming data.
9393
*/
9494
private void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) throws Exception {
95-
isList.add(new ByteBufInputStream(data.content(), true));
95+
isList.add(data.content());
9696
if (data.isEndStream()) {
97-
isList.add(NettyInputStream.END_OF_INPUT);
97+
isList.add(Unpooled.EMPTY_BUFFER);
9898
}
9999
}
100100

@@ -163,7 +163,7 @@ public void removeProperty(String name) {
163163
ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
164164
@Override
165165
public void operationComplete(Future<? super Void> future) throws Exception {
166-
isList.add(NettyInputStream.END_OF_INPUT_ERROR);
166+
isList.add(Unpooled.EMPTY_BUFFER);
167167
}
168168
});
169169

0 commit comments

Comments
 (0)