Skip to content

Commit 7a48d0e

Browse files
committed
fix(lib): properly handle body streaming errors
1 parent 7888451 commit 7a48d0e

File tree

4 files changed

+143
-77
lines changed

4 files changed

+143
-77
lines changed

src/proto/conn.rs

+24-11
Original file line numberDiff line numberDiff line change
@@ -235,18 +235,31 @@ where I: AsyncRead + AsyncWrite,
235235

236236
let (reading, ret) = match self.state.reading {
237237
Reading::Body(ref mut decoder) => {
238-
let slice = try_ready!(decoder.decode(&mut self.io));
239-
if !slice.is_empty() {
240-
return Ok(Async::Ready(Some(super::Chunk::from(slice))));
241-
} else if decoder.is_eof() {
242-
debug!("incoming body completed");
243-
(Reading::KeepAlive, Ok(Async::Ready(None)))
244-
} else {
245-
trace!("decode stream unexpectedly ended");
246-
//TODO: Should this return an UnexpectedEof?
247-
(Reading::Closed, Ok(Async::Ready(None)))
238+
match decoder.decode(&mut self.io) {
239+
Ok(Async::Ready(slice)) => {
240+
let chunk = if !slice.is_empty() {
241+
Some(super::Chunk::from(slice))
242+
} else {
243+
None
244+
};
245+
let reading = if decoder.is_eof() {
246+
debug!("incoming body completed");
247+
Reading::KeepAlive
248+
} else if chunk.is_some() {
249+
Reading::Body(decoder.clone())
250+
} else {
251+
trace!("decode stream unexpectedly ended");
252+
//TODO: Should this return an UnexpectedEof?
253+
Reading::Closed
254+
};
255+
(reading, Ok(Async::Ready(chunk)))
256+
},
257+
Ok(Async::NotReady) => return Ok(Async::NotReady),
258+
Err(e) => {
259+
trace!("decode stream error: {}", e);
260+
(Reading::Closed, Err(e))
261+
},
248262
}
249-
250263
},
251264
_ => unreachable!("read_body invalid state: {:?}", self.state.reading),
252265
};

src/proto/dispatch.rs

+52-52
Original file line numberDiff line numberDiff line change
@@ -85,66 +85,25 @@ where
8585
if self.is_closing {
8686
return Ok(Async::Ready(()));
8787
} else if self.conn.can_read_head() {
88-
// can dispatch receive, or does it still care about, an incoming message?
89-
match self.dispatch.poll_ready() {
90-
Ok(Async::Ready(())) => (),
91-
Ok(Async::NotReady) => unreachable!("dispatch not ready when conn is"),
92-
Err(()) => {
93-
trace!("dispatch no longer receiving messages");
94-
self.close();
95-
return Ok(Async::Ready(()));
96-
}
97-
}
98-
// dispatch is ready for a message, try to read one
99-
match self.conn.read_head() {
100-
Ok(Async::Ready(Some((head, has_body)))) => {
101-
let body = if has_body {
102-
let (mut tx, rx) = super::body::channel();
103-
let _ = tx.poll_ready(); // register this task if rx is dropped
104-
self.body_tx = Some(tx);
105-
Some(rx)
106-
} else {
107-
None
108-
};
109-
self.dispatch.recv_msg(Ok((head, body)))?;
110-
},
111-
Ok(Async::Ready(None)) => {
112-
// read eof, conn will start to shutdown automatically
113-
return Ok(Async::Ready(()));
114-
}
115-
Ok(Async::NotReady) => return Ok(Async::NotReady),
116-
Err(err) => {
117-
debug!("read_head error: {}", err);
118-
self.dispatch.recv_msg(Err(err))?;
119-
// if here, the dispatcher gave the user the error
120-
// somewhere else. we still need to shutdown, but
121-
// not as a second error.
122-
return Ok(Async::Ready(()));
123-
}
124-
}
88+
try_ready!(self.poll_read_head());
12589
} else if self.conn.can_write_continue() {
12690
try_nb!(self.conn.flush());
12791
} else if let Some(mut body) = self.body_tx.take() {
128-
let can_read_body = self.conn.can_read_body();
129-
match body.poll_ready() {
130-
Ok(Async::Ready(())) => (),
131-
Ok(Async::NotReady) => {
132-
self.body_tx = Some(body);
133-
return Ok(Async::NotReady);
134-
},
135-
Err(_canceled) => {
136-
// user doesn't care about the body
137-
// so we should stop reading
138-
if can_read_body {
92+
if self.conn.can_read_body() {
93+
match body.poll_ready() {
94+
Ok(Async::Ready(())) => (),
95+
Ok(Async::NotReady) => {
96+
self.body_tx = Some(body);
97+
return Ok(Async::NotReady);
98+
},
99+
Err(_canceled) => {
100+
// user doesn't care about the body
101+
// so we should stop reading
139102
trace!("body receiver dropped before eof, closing");
140103
self.conn.close_read();
141104
return Ok(Async::Ready(()));
142105
}
143-
// else the conn body is done, and user dropped,
144-
// so everything is fine!
145106
}
146-
}
147-
if can_read_body {
148107
match self.conn.read_body() {
149108
Ok(Async::Ready(Some(chunk))) => {
150109
match body.start_send(Ok(chunk)) {
@@ -183,6 +142,47 @@ where
183142
}
184143
}
185144

145+
fn poll_read_head(&mut self) -> Poll<(), ::Error> {
146+
// can dispatch receive, or does it still care about, an incoming message?
147+
match self.dispatch.poll_ready() {
148+
Ok(Async::Ready(())) => (),
149+
Ok(Async::NotReady) => unreachable!("dispatch not ready when conn is"),
150+
Err(()) => {
151+
trace!("dispatch no longer receiving messages");
152+
self.close();
153+
return Ok(Async::Ready(()));
154+
}
155+
}
156+
// dispatch is ready for a message, try to read one
157+
match self.conn.read_head() {
158+
Ok(Async::Ready(Some((head, has_body)))) => {
159+
let body = if has_body {
160+
let (mut tx, rx) = super::body::channel();
161+
let _ = tx.poll_ready(); // register this task if rx is dropped
162+
self.body_tx = Some(tx);
163+
Some(rx)
164+
} else {
165+
None
166+
};
167+
self.dispatch.recv_msg(Ok((head, body)))?;
168+
Ok(Async::Ready(()))
169+
},
170+
Ok(Async::Ready(None)) => {
171+
// read eof, conn will start to shutdown automatically
172+
Ok(Async::Ready(()))
173+
}
174+
Ok(Async::NotReady) => Ok(Async::NotReady),
175+
Err(err) => {
176+
debug!("read_head error: {}", err);
177+
self.dispatch.recv_msg(Err(err))?;
178+
// if here, the dispatcher gave the user the error
179+
// somewhere else. we still need to shutdown, but
180+
// not as a second error.
181+
Ok(Async::Ready(()))
182+
}
183+
}
184+
}
185+
186186
fn poll_write(&mut self) -> Poll<(), ::Error> {
187187
loop {
188188
if self.is_closing {

src/proto/h1/decode.rs

+19-6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::error::Error as StdError;
12
use std::fmt;
23
use std::usize;
34
use std::io;
@@ -97,7 +98,7 @@ impl Decoder {
9798
if num > *remaining {
9899
*remaining = 0;
99100
} else if num == 0 {
100-
return Err(io::Error::new(io::ErrorKind::Other, "early eof"));
101+
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, IncompleteBody));
101102
} else {
102103
*remaining -= num;
103104
}
@@ -262,7 +263,7 @@ impl ChunkedState {
262263

263264
if count == 0 {
264265
*rem = 0;
265-
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "early eof"));
266+
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, IncompleteBody));
266267
}
267268
*buf = Some(slice);
268269
*rem -= count as u64;
@@ -300,9 +301,23 @@ impl ChunkedState {
300301
}
301302
}
302303

304+
#[derive(Debug)]
305+
struct IncompleteBody;
306+
307+
impl fmt::Display for IncompleteBody {
308+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
309+
f.write_str(self.description())
310+
}
311+
}
312+
313+
impl StdError for IncompleteBody {
314+
fn description(&self) -> &str {
315+
"end of file before message length reached"
316+
}
317+
}
318+
303319
#[cfg(test)]
304320
mod tests {
305-
use std::error::Error;
306321
use std::io;
307322
use std::io::Write;
308323
use super::Decoder;
@@ -422,8 +437,7 @@ mod tests {
422437
let mut decoder = Decoder::length(10);
423438
assert_eq!(decoder.decode(&mut bytes).unwrap().unwrap().len(), 7);
424439
let e = decoder.decode(&mut bytes).unwrap_err();
425-
assert_eq!(e.kind(), io::ErrorKind::Other);
426-
assert_eq!(e.description(), "early eof");
440+
assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
427441
}
428442

429443
#[test]
@@ -436,7 +450,6 @@ mod tests {
436450
assert_eq!(decoder.decode(&mut bytes).unwrap().unwrap().len(), 7);
437451
let e = decoder.decode(&mut bytes).unwrap_err();
438452
assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
439-
assert_eq!(e.description(), "early eof");
440453
}
441454

442455
#[test]

tests/server.rs

+48-8
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use tokio_core::net::TcpListener;
1414
use tokio_core::reactor::{Core, Timeout};
1515
use tokio_io::{AsyncRead, AsyncWrite};
1616

17-
use std::net::{TcpStream, SocketAddr};
17+
use std::net::{TcpStream, Shutdown, SocketAddr};
1818
use std::io::{self, Read, Write};
1919
use std::sync::atomic::{AtomicBool, Ordering};
2020
use std::sync::mpsc;
@@ -223,6 +223,26 @@ fn post_with_chunked_body() {
223223
assert_eq!(server.body(), b"qwert");
224224
}
225225

226+
#[test]
227+
fn post_with_incomplete_body() {
228+
extern crate pretty_env_logger;
229+
let _ = pretty_env_logger::try_init();
230+
let server = serve();
231+
let mut req = connect(server.addr());
232+
req.write_all(b"\
233+
POST / HTTP/1.1\r\n\
234+
Host: example.domain\r\n\
235+
Content-Length: 10\r\n\
236+
\r\n\
237+
12345\
238+
").expect("write");
239+
req.shutdown(Shutdown::Write).expect("shutdown write");
240+
241+
server.body_err();
242+
243+
req.read(&mut [0; 256]).expect("read");
244+
}
245+
226246
#[test]
227247
fn empty_response_chunked() {
228248
let server = serve();
@@ -746,24 +766,34 @@ impl Serve {
746766
}
747767

748768
pub fn remote_addr(&self) -> SocketAddr {
749-
match self.msg_rx.try_recv() {
769+
match self.msg_rx.recv() {
750770
Ok(Msg::Addr(addr)) => addr,
751771
other => panic!("expected remote addr, found: {:?}", other),
752772
}
753773
}
754774

755775
fn body(&self) -> Vec<u8> {
776+
self.try_body().expect("body")
777+
}
778+
779+
fn body_err(&self) -> hyper::Error {
780+
self.try_body().expect_err("body_err")
781+
}
782+
783+
fn try_body(&self) -> Result<Vec<u8>, hyper::Error> {
756784
let mut buf = vec![];
757785
loop {
758-
match self.msg_rx.try_recv() {
786+
match self.msg_rx.recv() {
759787
Ok(Msg::Chunk(msg)) => {
760788
buf.extend(&msg);
761789
},
762790
Ok(Msg::Addr(_)) => {},
763-
Err(_) => break,
791+
Ok(Msg::Error(e)) => return Err(e),
792+
Ok(Msg::End) => break,
793+
Err(e) => panic!("expected body, found: {:?}", e),
764794
}
765795
}
766-
buf
796+
Ok(buf)
767797
}
768798

769799
fn reply(&self) -> ReplyBuilder {
@@ -821,6 +851,8 @@ enum Msg {
821851
//Head(Request),
822852
Addr(SocketAddr),
823853
Chunk(Vec<u8>),
854+
Error(hyper::Error),
855+
End,
824856
}
825857

826858
impl NewService for TestService {
@@ -841,15 +873,23 @@ impl Service for TestService {
841873
type Error = hyper::Error;
842874
type Future = Box<Future<Item=Response, Error=hyper::Error>>;
843875
fn call(&self, req: Request) -> Self::Future {
844-
let tx = self.tx.clone();
876+
let tx1 = self.tx.clone();
877+
let tx2 = self.tx.clone();
845878

846879
#[allow(deprecated)]
847880
let remote_addr = req.remote_addr().expect("remote_addr");
848-
tx.lock().unwrap().send(Msg::Addr(remote_addr)).unwrap();
881+
tx1.lock().unwrap().send(Msg::Addr(remote_addr)).unwrap();
849882

850883
let replies = self.reply.clone();
851884
Box::new(req.body().for_each(move |chunk| {
852-
tx.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap();
885+
tx1.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap();
886+
Ok(())
887+
}).then(move |result| {
888+
let msg = match result {
889+
Ok(()) => Msg::End,
890+
Err(e) => Msg::Error(e),
891+
};
892+
tx2.lock().unwrap().send(msg).unwrap();
853893
Ok(())
854894
}).map(move |_| {
855895
let mut res = Response::new();

0 commit comments

Comments
 (0)