Skip to content

Commit dc619a8

Browse files
committed
fix(client): detect connection closes as pool tries to use
Currently, if the remote closes the connection at the same time that the pool selects it to use for a new request, the connection may actually hang. This fix will now more allow the keep-alive read to check the socket even when the `Conn` think it's busy. If the connection was closed before the request write happened, returns back an `Error::Cancel`, letting the user know they could safely retry it. Closes #1439
1 parent a9413d7 commit dc619a8

File tree

6 files changed

+163
-90
lines changed

6 files changed

+163
-90
lines changed

src/client/dispatch.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ impl<T, U> Drop for Receiver<T, U> {
8585
// - Err: unreachable
8686
while let Ok(Async::Ready(Some((_val, cb)))) = self.inner.poll() {
8787
// maybe in future, we pass the value along with the error?
88-
let _ = cb.send(Err(::Error::new_canceled()));
88+
let _ = cb.send(Err(::Error::new_canceled(None)));
8989
}
9090
}
9191

src/client/mod.rs

+1-5
Original file line numberDiff line numberDiff line change
@@ -243,11 +243,7 @@ where C: Connect,
243243
},
244244
Err(_) => {
245245
error!("pooled connection was not ready, this is a hyper bug");
246-
let err = io::Error::new(
247-
io::ErrorKind::BrokenPipe,
248-
"pool selected dead connection",
249-
);
250-
Either::B(future::err(::Error::Io(err)))
246+
Either::B(future::err(::Error::new_canceled(None)))
251247
}
252248
}
253249
});

src/error.rs

+5-12
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ pub enum Error {
6060
}
6161

6262
impl Error {
63-
pub(crate) fn new_canceled() -> Error {
63+
pub(crate) fn new_canceled(cause: Option<Error>) -> Error {
6464
Error::Cancel(Canceled {
65-
_inner: (),
65+
cause: cause.map(Box::new),
6666
})
6767
}
6868
}
@@ -73,10 +73,9 @@ impl Error {
7373
/// as the related connection gets closed by the remote. In that case,
7474
/// when the connection drops, the pending response future will be
7575
/// fulfilled with this error, signaling the `Request` was never started.
76+
#[derive(Debug)]
7677
pub struct Canceled {
77-
// maybe in the future this contains an optional value of
78-
// what was canceled?
79-
_inner: (),
78+
cause: Option<Box<Error>>,
8079
}
8180

8281
impl Canceled {
@@ -85,13 +84,6 @@ impl Canceled {
8584
}
8685
}
8786

88-
impl fmt::Debug for Canceled {
89-
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
90-
f.debug_struct("Canceled")
91-
.finish()
92-
}
93-
}
94-
9587
impl fmt::Display for Canceled {
9688
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
9789
f.pad(self.description())
@@ -142,6 +134,7 @@ impl StdError for Error {
142134
Io(ref error) => Some(error),
143135
Uri(ref error) => Some(error),
144136
Utf8(ref error) => Some(error),
137+
Cancel(ref e) => e.cause.as_ref().map(|e| &**e as &StdError),
145138
Error::__Nonexhaustive(..) => unreachable!(),
146139
_ => None,
147140
}

src/proto/h1/conn.rs

+58-24
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ where I: AsyncRead + AsyncWrite,
119119
} else {
120120
trace!("poll when on keep-alive");
121121
if !T::should_read_first() {
122-
self.try_empty_read()?;
122+
self.require_empty_read()?;
123123
if self.is_read_closed() {
124124
return Ok(Async::Ready(None));
125125
}
@@ -281,17 +281,23 @@ where I: AsyncRead + AsyncWrite,
281281
pub fn read_keep_alive(&mut self) -> Result<(), ::Error> {
282282
debug_assert!(!self.can_read_head() && !self.can_read_body());
283283

284-
trace!("Conn::read_keep_alive");
284+
trace!("read_keep_alive; is_mid_message={}", self.is_mid_message());
285285

286-
if T::should_read_first() || !self.state.is_idle() {
286+
if self.is_mid_message() {
287287
self.maybe_park_read();
288288
} else {
289-
self.try_empty_read()?;
289+
self.require_empty_read()?;
290290
}
291-
292291
Ok(())
293292
}
294293

294+
fn is_mid_message(&self) -> bool {
295+
match (&self.state.reading, &self.state.writing) {
296+
(&Reading::Init, &Writing::Init) => false,
297+
_ => true,
298+
}
299+
}
300+
295301
fn maybe_park_read(&mut self) {
296302
if !self.io.is_read_blocked() {
297303
// the Io object is ready to read, which means it will never alert
@@ -312,40 +318,68 @@ where I: AsyncRead + AsyncWrite,
312318
//
313319
// This should only be called for Clients wanting to enter the idle
314320
// state.
315-
fn try_empty_read(&mut self) -> io::Result<()> {
321+
fn require_empty_read(&mut self) -> io::Result<()> {
316322
assert!(!self.can_read_head() && !self.can_read_body());
317323

318324
if !self.io.read_buf().is_empty() {
319325
debug!("received an unexpected {} bytes", self.io.read_buf().len());
320326
Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected bytes after message ended"))
321327
} else {
322-
match self.io.read_from_io() {
323-
Ok(Async::Ready(0)) => {
324-
trace!("try_empty_read; found EOF on connection: {:?}", self.state);
325-
let must_error = self.should_error_on_eof();
326-
// order is important: must_error needs state BEFORE close_read
327-
self.state.close_read();
328-
if must_error {
329-
Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF waiting for response"))
330-
} else {
331-
Ok(())
332-
}
328+
match self.try_io_read()? {
329+
Async::Ready(0) => {
330+
// case handled in try_io_read
331+
Ok(())
333332
},
334-
Ok(Async::Ready(n)) => {
333+
Async::Ready(n) => {
335334
debug!("received {} bytes on an idle connection", n);
336-
Err(io::Error::new(io::ErrorKind::InvalidData, "unexpected bytes after message ended"))
335+
let desc = if self.state.is_idle() {
336+
"unexpected bytes after message ended"
337+
} else {
338+
"unexpected bytes before writing message"
339+
};
340+
Err(io::Error::new(io::ErrorKind::InvalidData, desc))
337341
},
338-
Ok(Async::NotReady) => {
342+
Async::NotReady => {
339343
Ok(())
340344
},
341-
Err(e) => {
342-
self.state.close();
343-
Err(e)
344-
}
345345
}
346346
}
347347
}
348348

349+
fn try_io_read(&mut self) -> Poll<usize, io::Error> {
350+
match self.io.read_from_io() {
351+
Ok(Async::Ready(0)) => {
352+
trace!("try_io_read; found EOF on connection: {:?}", self.state);
353+
let must_error = self.should_error_on_eof();
354+
let ret = if must_error {
355+
let desc = if self.is_mid_message() {
356+
"unexpected EOF waiting for response"
357+
} else {
358+
"unexpected EOF before writing message"
359+
};
360+
Err(io::Error::new(io::ErrorKind::UnexpectedEof, desc))
361+
} else {
362+
Ok(Async::Ready(0))
363+
};
364+
365+
// order is important: must_error needs state BEFORE close_read
366+
self.state.close_read();
367+
ret
368+
},
369+
Ok(Async::Ready(n)) => {
370+
Ok(Async::Ready(n))
371+
},
372+
Ok(Async::NotReady) => {
373+
Ok(Async::NotReady)
374+
},
375+
Err(e) => {
376+
self.state.close();
377+
Err(e)
378+
}
379+
}
380+
}
381+
382+
349383
fn maybe_notify(&mut self) {
350384
// its possible that we returned NotReady from poll() without having
351385
// exhausted the underlying Io. We would have done this when we

src/proto/h1/dispatch.rs

+15-7
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,9 @@ where
401401
let _ = cb.send(Err(err));
402402
Ok(())
403403
} else if let Ok(Async::Ready(Some((_, cb)))) = self.rx.poll() {
404-
let _ = cb.send(Err(err));
404+
// in this case, the message was never even started, so it's safe to tell
405+
// the user that the request was completely canceled
406+
let _ = cb.send(Err(::Error::new_canceled(Some(err))));
405407
Ok(())
406408
} else {
407409
Err(err)
@@ -431,13 +433,14 @@ where
431433

432434
#[cfg(test)]
433435
mod tests {
436+
extern crate pretty_env_logger;
437+
434438
use super::*;
435439
use mock::AsyncIo;
436440
use proto::ClientTransaction;
437441

438442
#[test]
439-
fn client_read_response_before_writing_request() {
440-
extern crate pretty_env_logger;
443+
fn client_read_bytes_before_writing_request() {
441444
let _ = pretty_env_logger::try_init();
442445
::futures::lazy(|| {
443446
let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 100);
@@ -452,11 +455,16 @@ mod tests {
452455
};
453456
let res_rx = tx.send((req, None::<::Body>)).unwrap();
454457

455-
dispatcher.poll().expect("dispatcher poll 1");
456-
dispatcher.poll().expect("dispatcher poll 2");
457-
let _res = res_rx.wait()
458+
let a1 = dispatcher.poll().expect("error should be sent on channel");
459+
assert!(a1.is_ready(), "dispatcher should be closed");
460+
let err = res_rx.wait()
458461
.expect("callback poll")
459-
.expect("callback response");
462+
.expect_err("callback response");
463+
464+
match err {
465+
::Error::Cancel(_) => (),
466+
other => panic!("expected Cancel(_), got {:?}", other),
467+
}
460468
Ok::<(), ()>(())
461469
}).wait().unwrap();
462470
}

0 commit comments

Comments
 (0)