21
21
import java .io .InputStream ;
22
22
import java .util .Map ;
23
23
import java .util .concurrent .CompletableFuture ;
24
- import java .util .concurrent .LinkedBlockingDeque ;
25
24
26
25
import jakarta .ws .rs .core .Response ;
27
26
28
27
import org .glassfish .jersey .client .ClientRequest ;
29
28
import org .glassfish .jersey .client .ClientResponse ;
30
- import org .glassfish .jersey .client .spi .AsyncConnectorCallback ;
31
29
import org .glassfish .jersey .netty .connector .internal .NettyInputStream ;
32
30
33
31
import io .netty .buffer .ByteBuf ;
34
- import io .netty .buffer .Unpooled ;
35
32
import io .netty .channel .ChannelHandlerContext ;
36
33
import io .netty .channel .SimpleChannelInboundHandler ;
37
34
import io .netty .handler .codec .http .HttpContent ;
40
37
import io .netty .handler .codec .http .HttpResponse ;
41
38
import io .netty .handler .codec .http .HttpUtil ;
42
39
import io .netty .handler .codec .http .LastHttpContent ;
43
- import io .netty .util .concurrent .Future ;
44
- import io .netty .util .concurrent .GenericFutureListener ;
45
40
46
41
/**
47
42
* Jersey implementation of Netty channel handler.
50
45
*/
51
46
class JerseyClientHandler extends SimpleChannelInboundHandler <HttpObject > {
52
47
53
- private final NettyConnector connector ;
54
- private final LinkedBlockingDeque <ByteBuf > isList = new LinkedBlockingDeque <>();
55
-
56
- private final AsyncConnectorCallback asyncConnectorCallback ;
57
48
private final ClientRequest jerseyRequest ;
58
- private final CompletableFuture future ;
49
+ private final CompletableFuture <ClientResponse > responseAvailable ;
50
+ private final CompletableFuture <?> responseDone ;
51
+
52
+ private NettyInputStream nis ;
53
+ private ClientResponse jerseyResponse ;
59
54
60
- JerseyClientHandler (NettyConnector nettyConnector , ClientRequest request ,
61
- AsyncConnectorCallback callback , CompletableFuture future ) {
62
- this .connector = nettyConnector ;
63
- this .asyncConnectorCallback = callback ;
55
+ JerseyClientHandler (ClientRequest request ,
56
+ CompletableFuture <ClientResponse > responseAvailable ,
57
+ CompletableFuture <?> responseDone ) {
64
58
this .jerseyRequest = request ;
65
- this .future = future ;
59
+ this .responseAvailable = responseAvailable ;
60
+ this .responseDone = responseDone ;
61
+ }
62
+
63
+ @ Override
64
+ public void channelReadComplete (ChannelHandlerContext ctx ) {
65
+ notifyResponse ();
66
+ }
67
+
68
+ @ Override
69
+ public void channelInactive (ChannelHandlerContext ctx ) {
70
+ // assert: no-op, if channel is closed after LastHttpContent has been consumed
71
+ responseDone .completeExceptionally (new IOException ("Stream closed" ));
72
+ }
73
+
74
+ protected void notifyResponse () {
75
+ if (jerseyResponse != null ) {
76
+ ClientResponse cr = jerseyResponse ;
77
+ jerseyResponse = null ;
78
+ responseAvailable .complete (cr );
79
+ }
66
80
}
67
81
68
82
@ Override
69
83
public void channelRead0 (ChannelHandlerContext ctx , HttpObject msg ) {
70
84
if (msg instanceof HttpResponse ) {
71
85
final HttpResponse response = (HttpResponse ) msg ;
72
86
73
- final ClientResponse jerseyResponse = new ClientResponse (new Response .StatusType () {
87
+ jerseyResponse = new ClientResponse (new Response .StatusType () {
74
88
@ Override
75
89
public int getStatusCode () {
76
90
return response .status ().code ();
@@ -90,19 +104,15 @@ public String getReasonPhrase() {
90
104
for (Map .Entry <String , String > entry : response .headers ().entries ()) {
91
105
jerseyResponse .getHeaders ().add (entry .getKey (), entry .getValue ());
92
106
}
93
- isList . clear (); // clearing the content - possible leftover from previous request processing.
107
+
94
108
// request entity handling.
95
109
if ((response .headers ().contains (HttpHeaderNames .CONTENT_LENGTH ) && HttpUtil .getContentLength (response ) > 0 )
96
110
|| HttpUtil .isTransferEncodingChunked (response )) {
97
111
98
- ctx .channel ().closeFuture ().addListener (new GenericFutureListener <Future <? super Void >>() {
99
- @ Override
100
- public void operationComplete (Future <? super Void > future ) throws Exception {
101
- isList .add (Unpooled .EMPTY_BUFFER );
102
- }
103
- });
112
+ nis = new NettyInputStream ();
113
+ responseDone .whenComplete ((_r , th ) -> nis .complete (th ));
104
114
105
- jerseyResponse .setEntityStream (new NettyInputStream ( isList ) );
115
+ jerseyResponse .setEntityStream (nis );
106
116
} else {
107
117
jerseyResponse .setEntityStream (new InputStream () {
108
118
@ Override
@@ -111,44 +121,29 @@ public int read() throws IOException {
111
121
}
112
122
});
113
123
}
114
-
115
- if (asyncConnectorCallback != null ) {
116
- connector .executorService .execute (new Runnable () {
117
- @ Override
118
- public void run () {
119
- asyncConnectorCallback .response (jerseyResponse );
120
- future .complete (jerseyResponse );
121
- }
122
- });
123
- }
124
-
125
124
}
126
125
if (msg instanceof HttpContent ) {
126
+
127
127
HttpContent httpContent = (HttpContent ) msg ;
128
128
129
129
ByteBuf content = httpContent .content ();
130
+
130
131
if (content .isReadable ()) {
131
132
content .retain ();
132
- isList . add (content );
133
+ nis . publish (content );
133
134
}
134
135
135
136
if (msg instanceof LastHttpContent ) {
136
- isList .add (Unpooled .EMPTY_BUFFER );
137
+ responseDone .complete (null );
138
+ notifyResponse ();
137
139
}
138
140
}
139
141
}
140
142
143
+
144
+
141
145
@ Override
142
146
public void exceptionCaught (ChannelHandlerContext ctx , final Throwable cause ) {
143
- if (asyncConnectorCallback != null ) {
144
- connector .executorService .execute (new Runnable () {
145
- @ Override
146
- public void run () {
147
- asyncConnectorCallback .failure (cause );
148
- }
149
- });
150
- }
151
- future .completeExceptionally (cause );
152
- ctx .close ();
147
+ responseDone .completeExceptionally (cause );
153
148
}
154
149
}
0 commit comments