Skip to content

Commit 73345be

Browse files
committed
feat(server): add http1_half_close(bool) option
This option determines whether a read EOF should close the connection automatically. The behavior was to always allow read EOF while waiting to respond, so this option has a default of `true`. Setting this option to `false` will allow Service futures to be canceled as soon as disconnect is noticed. Closes #1716
1 parent c35bdca commit 73345be

File tree

4 files changed

+121
-5
lines changed

4 files changed

+121
-5
lines changed

src/proto/h1/conn.rs

+22-5
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ where I: AsyncRead + AsyncWrite,
3838
Conn {
3939
io: Buffered::new(io),
4040
state: State {
41+
allow_half_close: true,
4142
cached_headers: None,
4243
error: None,
4344
keep_alive: KA::Busy,
@@ -75,6 +76,10 @@ where I: AsyncRead + AsyncWrite,
7576
self.state.title_case_headers = true;
7677
}
7778

79+
pub(crate) fn set_disable_half_close(&mut self) {
80+
self.state.allow_half_close = false;
81+
}
82+
7883
pub fn into_inner(self) -> (I, Bytes) {
7984
self.io.into_inner()
8085
}
@@ -228,10 +233,11 @@ where I: AsyncRead + AsyncWrite,
228233

229234
trace!("read_keep_alive; is_mid_message={}", self.is_mid_message());
230235

231-
if !self.is_mid_message() {
232-
self.require_empty_read().map_err(::Error::new_io)?;
236+
if self.is_mid_message() {
237+
self.mid_message_detect_eof().map_err(::Error::new_io)
238+
} else {
239+
self.require_empty_read().map_err(::Error::new_io)
233240
}
234-
Ok(())
235241
}
236242

237243
fn is_mid_message(&self) -> bool {
@@ -252,7 +258,7 @@ where I: AsyncRead + AsyncWrite,
252258
// This should only be called for Clients wanting to enter the idle
253259
// state.
254260
fn require_empty_read(&mut self) -> io::Result<()> {
255-
assert!(!self.can_read_head() && !self.can_read_body());
261+
debug_assert!(!self.can_read_head() && !self.can_read_body());
256262

257263
if !self.io.read_buf().is_empty() {
258264
debug!("received an unexpected {} bytes", self.io.read_buf().len());
@@ -279,11 +285,21 @@ where I: AsyncRead + AsyncWrite,
279285
}
280286
}
281287

288+
fn mid_message_detect_eof(&mut self) -> io::Result<()> {
289+
debug_assert!(!self.can_read_head() && !self.can_read_body());
290+
291+
if self.state.allow_half_close || !self.io.read_buf().is_empty() {
292+
Ok(())
293+
} else {
294+
self.try_io_read().map(|_| ())
295+
}
296+
}
297+
282298
fn try_io_read(&mut self) -> Poll<usize, io::Error> {
283299
match self.io.read_from_io() {
284300
Ok(Async::Ready(0)) => {
285301
trace!("try_io_read; found EOF on connection: {:?}", self.state);
286-
let must_error = self.should_error_on_eof();
302+
let must_error = !self.state.is_idle();
287303
let ret = if must_error {
288304
let desc = if self.is_mid_message() {
289305
"unexpected EOF waiting for response"
@@ -655,6 +671,7 @@ impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
655671
}
656672

657673
struct State {
674+
allow_half_close: bool,
658675
/// Re-usable HeaderMap to reduce allocating new ones.
659676
cached_headers: Option<HeaderMap>,
660677
/// If an error occurs when there wasn't a direct way to return it

src/server/conn.rs

+20
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub(super) use self::upgrades::UpgradeableConnection;
4545
#[derive(Clone, Debug)]
4646
pub struct Http<E = Exec> {
4747
exec: E,
48+
h1_half_close: bool,
4849
h1_writev: bool,
4950
mode: ConnectionMode,
5051
keep_alive: bool,
@@ -163,6 +164,7 @@ impl Http {
163164
pub fn new() -> Http {
164165
Http {
165166
exec: Exec::Default,
167+
h1_half_close: true,
166168
h1_writev: true,
167169
mode: ConnectionMode::Fallback,
168170
keep_alive: true,
@@ -195,6 +197,20 @@ impl<E> Http<E> {
195197
self
196198
}
197199

200+
/// Set whether HTTP/1 connections should support half-closures.
201+
///
202+
/// Clients can chose to shutdown their write-side while waiting
203+
/// for the server to respond. Setting this to `false` will
204+
/// automatically close any connection immediately if `read`
205+
/// detects an EOF.
206+
///
207+
/// Default is `true`.
208+
#[inline]
209+
pub fn http1_half_close(&mut self, val: bool) -> &mut Self {
210+
self.h1_half_close = val;
211+
self
212+
}
213+
198214
/// Set whether HTTP/1 connections should try to use vectored writes,
199215
/// or always flatten into a single buffer.
200216
///
@@ -261,6 +277,7 @@ impl<E> Http<E> {
261277
pub fn with_executor<E2>(self, exec: E2) -> Http<E2> {
262278
Http {
263279
exec,
280+
h1_half_close: self.h1_half_close,
264281
h1_writev: self.h1_writev,
265282
mode: self.mode,
266283
keep_alive: self.keep_alive,
@@ -319,6 +336,9 @@ impl<E> Http<E> {
319336
if !self.keep_alive {
320337
conn.disable_keep_alive();
321338
}
339+
if !self.h1_half_close {
340+
conn.set_disable_half_close();
341+
}
322342
if !self.h1_writev {
323343
conn.set_write_strategy_flatten();
324344
}

src/server/mod.rs

+14
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,20 @@ impl<I, E> Builder<I, E> {
247247
self
248248
}
249249

250+
251+
/// Set whether HTTP/1 connections should support half-closures.
252+
///
253+
/// Clients can chose to shutdown their write-side while waiting
254+
/// for the server to respond. Setting this to `false` will
255+
/// automatically close any connection immediately if `read`
256+
/// detects an EOF.
257+
///
258+
/// Default is `true`.
259+
pub fn http1_half_close(mut self, val: bool) -> Self {
260+
self.protocol.http1_half_close(val);
261+
self
262+
}
263+
250264
/// Sets whether HTTP/1 is required.
251265
///
252266
/// Default is `false`.

tests/server.rs

+65
Original file line numberDiff line numberDiff line change
@@ -991,6 +991,71 @@ fn nonempty_parse_eof_returns_error() {
991991
rt.block_on(fut).expect_err("partial parse eof is error");
992992
}
993993

994+
#[test]
995+
fn socket_half_closed() {
996+
let _ = pretty_env_logger::try_init();
997+
let mut rt = Runtime::new().unwrap();
998+
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
999+
let addr = listener.local_addr().unwrap();
1000+
1001+
thread::spawn(move || {
1002+
let mut tcp = connect(&addr);
1003+
tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap();
1004+
tcp.shutdown(::std::net::Shutdown::Write).expect("SHDN_WR");
1005+
1006+
let mut buf = [0; 256];
1007+
tcp.read(&mut buf).unwrap();
1008+
let expected = "HTTP/1.1 200 OK\r\n";
1009+
assert_eq!(s(&buf[..expected.len()]), expected);
1010+
});
1011+
1012+
let fut = listener.incoming()
1013+
.into_future()
1014+
.map_err(|_| unreachable!())
1015+
.and_then(|(item, _incoming)| {
1016+
let socket = item.unwrap();
1017+
Http::new()
1018+
.serve_connection(socket, service_fn(|_| {
1019+
Delay::new(Duration::from_millis(500))
1020+
.map(|_| {
1021+
Response::new(Body::empty())
1022+
})
1023+
}))
1024+
});
1025+
1026+
rt.block_on(fut).unwrap();
1027+
}
1028+
1029+
#[test]
1030+
fn disconnect_after_reading_request_before_responding() {
1031+
let _ = pretty_env_logger::try_init();
1032+
let mut rt = Runtime::new().unwrap();
1033+
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
1034+
let addr = listener.local_addr().unwrap();
1035+
1036+
thread::spawn(move || {
1037+
let mut tcp = connect(&addr);
1038+
tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap();
1039+
});
1040+
1041+
let fut = listener.incoming()
1042+
.into_future()
1043+
.map_err(|_| unreachable!())
1044+
.and_then(|(item, _incoming)| {
1045+
let socket = item.unwrap();
1046+
Http::new()
1047+
.http1_half_close(false)
1048+
.serve_connection(socket, service_fn(|_| {
1049+
Delay::new(Duration::from_secs(2))
1050+
.map(|_| -> Response<Body> {
1051+
panic!("response future should have been dropped");
1052+
})
1053+
}))
1054+
});
1055+
1056+
rt.block_on(fut).expect_err("socket disconnected");
1057+
}
1058+
9941059
#[test]
9951060
fn returning_1xx_response_is_error() {
9961061
let mut rt = Runtime::new().unwrap();

0 commit comments

Comments
 (0)