Skip to content

Commit

Permalink
Upgrade to Undertow 2.3.18.Final, dispatch in UndertowHttpHandlerAdapter
Browse files Browse the repository at this point in the history
This ensures that the reactive handling of the request is dispatched
from the Undertow IO thread, marking the exchange as async rather than
ending it once the Undertow `handleRequest` method returns.

Closes gh-33885
  • Loading branch information
simonbasle committed Nov 14, 2024
1 parent 56525da commit 35b452b
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 17 deletions.
2 changes: 1 addition & 1 deletion framework-platform/framework-platform.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ dependencies {
api("io.r2dbc:r2dbc-spi:1.0.0.RELEASE")
api("io.reactivex.rxjava3:rxjava:3.1.9")
api("io.smallrye.reactive:mutiny:1.10.0")
api("io.undertow:undertow-core:2.3.17.Final")
api("io.undertow:undertow-core:2.3.18.Final")
api("io.undertow:undertow-servlet:2.3.17.Final")
api("io.undertow:undertow-websockets-jsr:2.3.17.Final")
api("io.vavr:vavr:0.10.4")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,27 @@ public DataBufferFactory getDataBufferFactory() {

@Override
public void handleRequest(HttpServerExchange exchange) {
UndertowServerHttpRequest request = null;
try {
request = new UndertowServerHttpRequest(exchange, getDataBufferFactory());
}
catch (URISyntaxException ex) {
if (logger.isWarnEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
exchange.dispatch(() -> {
UndertowServerHttpRequest request = null;
try {
request = new UndertowServerHttpRequest(exchange, getDataBufferFactory());
}
exchange.setStatusCode(400);
return;
}
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, getDataBufferFactory(), request);
catch (URISyntaxException ex) {
if (logger.isWarnEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
exchange.setStatusCode(400);
return;
}
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, getDataBufferFactory(), request);

if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}

HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(exchange, request);
this.httpHandler.handle(request, response).subscribe(resultSubscriber);
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(exchange, request);
this.httpHandler.handle(request, response).subscribe(resultSubscriber);
});
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.web.reactive.result.method.annotation;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -331,6 +332,17 @@ void personTransformWithFlux(HttpServer httpServer) throws Exception {
assertThat(performPost("/person-transform/flux", JSON, req, JSON, PERSON_LIST).getBody()).isEqualTo(res);
}

@ParameterizedHttpServerTest // see gh-33885
void personTransformWithFluxDelayed(HttpServer httpServer) throws Exception {
startServer(httpServer);

List<?> req = asList(new Person("Robert"), new Person("Marie"));
List<?> res = asList(new Person("ROBERT"), new Person("MARIE"));
assertThat(performPost("/person-transform/flux-delayed", JSON, req, JSON, PERSON_LIST))
.satisfies(r -> assertThat(r.getBody()).isEqualTo(res))
.satisfies(r -> assertThat(r.getHeaders().getContentLength()).isNotZero());
}

@ParameterizedHttpServerTest
void personTransformWithObservable(HttpServer httpServer) throws Exception {
startServer(httpServer);
Expand Down Expand Up @@ -632,6 +644,11 @@ Flux<Person> transformFlux(@RequestBody Flux<Person> persons) {
return persons.map(person -> new Person(person.getName().toUpperCase()));
}

@PostMapping("/flux-delayed")
Flux<Person> transformDelayed(@RequestBody Flux<Person> persons) {
return transformFlux(persons).delayElements(Duration.ofMillis(10));
}

@PostMapping("/observable")
Observable<Person> transformObservable(@RequestBody Observable<Person> persons) {
return persons.map(person -> new Person(person.getName().toUpperCase()));
Expand Down

0 comments on commit 35b452b

Please # to comment.