Skip to content

Commit c119097

Browse files
committed
feat(http2): add HTTP/2 support for Client and Server
1 parent fe1578a commit c119097

25 files changed

+2015
-364
lines changed

Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ futures-cpupool = "0.1.6"
2727
futures-timer = "0.1.0"
2828
http = "0.1.5"
2929
httparse = "1.0"
30+
h2 = "0.1.5"
3031
iovec = "0.1"
3132
log = "0.4"
3233
net2 = "0.2.32"
@@ -35,7 +36,7 @@ tokio = "0.1.5"
3536
tokio-executor = "0.1.0"
3637
tokio-service = "0.1"
3738
tokio-io = "0.1"
38-
want = "0.0.2"
39+
want = "0.0.3"
3940

4041
[dev-dependencies]
4142
num_cpus = "1.0"

examples/multi_server.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,12 @@ fn main() {
5151
println!("Listening on http://{}", srv2.incoming_ref().local_addr());
5252

5353
tokio::spawn(srv1.for_each(move |conn| {
54-
tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv1 error: {:?}", err)));
54+
tokio::spawn(conn.map_err(|err| println!("srv1 error: {:?}", err)));
5555
Ok(())
5656
}).map_err(|_| ()));
5757

5858
tokio::spawn(srv2.for_each(move |conn| {
59-
tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv2 error: {:?}", err)));
59+
tokio::spawn(conn.map_err(|err| println!("srv2 error: {:?}", err)));
6060
Ok(())
6161
}).map_err(|_| ()));
6262

examples/web_api.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ fn main() {
8383
println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr());
8484

8585
serve.map_err(|_| ()).for_each(move |conn| {
86-
tokio::spawn(conn.map(|_| ()).map_err(|err| println!("serve error: {:?}", err)))
86+
tokio::spawn(conn.map_err(|err| println!("serve error: {:?}", err)))
8787
})
8888
}));
8989
}

src/body.rs

+28-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::fmt;
55
use bytes::Bytes;
66
use futures::{Async, Future, Poll, Stream};
77
use futures::sync::{mpsc, oneshot};
8+
use h2;
89
use http::HeaderMap;
910

1011
use common::Never;
@@ -13,9 +14,9 @@ use super::Chunk;
1314
type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
1415

1516
/// This trait represents a streaming body of a `Request` or `Response`.
16-
pub trait Payload {
17+
pub trait Payload: Send + 'static {
1718
/// A buffer of bytes representing a single chunk of a body.
18-
type Data: AsRef<[u8]>;
19+
type Data: AsRef<[u8]> + Send;
1920

2021
/// The error type of this stream.
2122
type Error: Into<Box<::std::error::Error + Send + Sync>>;
@@ -107,6 +108,7 @@ enum Kind {
107108
_close_tx: oneshot::Sender<()>,
108109
rx: mpsc::Receiver<Result<Chunk, ::Error>>,
109110
},
111+
H2(h2::RecvStream),
110112
Wrapped(Box<Stream<Item=Chunk, Error=Box<::std::error::Error + Send + Sync>> + Send>),
111113
Once(Option<Chunk>),
112114
Empty,
@@ -219,6 +221,10 @@ impl Body {
219221
}
220222
}
221223

224+
pub(crate) fn h2(recv: h2::RecvStream) -> Self {
225+
Body::new(Kind::H2(recv))
226+
}
227+
222228
pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
223229
self.delayed_eof = Some(DelayEof::NotEof(fut));
224230
}
@@ -269,6 +275,17 @@ impl Body {
269275
Async::Ready(None) => Ok(Async::Ready(None)),
270276
Async::NotReady => Ok(Async::NotReady),
271277
},
278+
Kind::H2(ref mut h2) => {
279+
h2.poll()
280+
.map(|async| {
281+
async.map(|opt| {
282+
opt.map(|bytes| {
283+
Chunk::h2(bytes, h2.release_capacity())
284+
})
285+
})
286+
})
287+
.map_err(::Error::new_body)
288+
},
272289
Kind::Wrapped(ref mut s) => s.poll().map_err(::Error::new_body),
273290
Kind::Once(ref mut val) => Ok(Async::Ready(val.take())),
274291
Kind::Empty => Ok(Async::Ready(None)),
@@ -291,9 +308,17 @@ impl Payload for Body {
291308
self.poll_eof()
292309
}
293310

311+
fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, Self::Error> {
312+
match self.kind {
313+
Kind::H2(ref mut h2) => h2.poll_trailers().map_err(::Error::new_h2),
314+
_ => Ok(Async::Ready(None)),
315+
}
316+
}
317+
294318
fn is_end_stream(&self) -> bool {
295319
match self.kind {
296320
Kind::Chan { .. } => false,
321+
Kind::H2(..) => false,
297322
Kind::Wrapped(..) => false,
298323
Kind::Once(ref val) => val.is_none(),
299324
Kind::Empty => true
@@ -303,6 +328,7 @@ impl Payload for Body {
303328
fn content_length(&self) -> Option<u64> {
304329
match self.kind {
305330
Kind::Chan { .. } => None,
331+
Kind::H2(..) => None,
306332
Kind::Wrapped(..) => None,
307333
Kind::Once(Some(ref val)) => Some(val.len() as u64),
308334
Kind::Once(None) => None,

src/chunk.rs

+38-17
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,38 @@
11
use std::fmt;
22

33
use bytes::Bytes;
4+
use h2::ReleaseCapacity;
45

56
/// A piece of a message body.
67
pub struct Chunk(Inner);
78

8-
enum Inner {
9-
Shared(Bytes),
9+
struct Inner {
10+
bytes: Bytes,
11+
_flow_control: Option<AutoRelease>,
12+
}
13+
14+
struct AutoRelease {
15+
cap: usize,
16+
release: ReleaseCapacity,
17+
}
18+
19+
impl Drop for AutoRelease {
20+
fn drop(&mut self) {
21+
let _ = self.release.release_capacity(self.cap);
22+
}
23+
}
24+
25+
impl Chunk {
26+
pub(crate) fn h2(bytes: Bytes, rel_cap: &ReleaseCapacity) -> Chunk {
27+
let cap = bytes.len();
28+
Chunk(Inner {
29+
bytes: bytes,
30+
_flow_control: Some(AutoRelease {
31+
cap: cap,
32+
release: rel_cap.clone(),
33+
}),
34+
})
35+
}
1036
}
1137

1238
impl From<Vec<u8>> for Chunk {
@@ -39,17 +65,18 @@ impl From<&'static str> for Chunk {
3965

4066
impl From<Bytes> for Chunk {
4167
#[inline]
42-
fn from(mem: Bytes) -> Chunk {
43-
Chunk(Inner::Shared(mem))
68+
fn from(bytes: Bytes) -> Chunk {
69+
Chunk(Inner {
70+
bytes: bytes,
71+
_flow_control: None,
72+
})
4473
}
4574
}
4675

4776
impl From<Chunk> for Bytes {
4877
#[inline]
4978
fn from(chunk: Chunk) -> Bytes {
50-
match chunk.0 {
51-
Inner::Shared(bytes) => bytes,
52-
}
79+
chunk.0.bytes
5380
}
5481
}
5582

@@ -65,9 +92,7 @@ impl ::std::ops::Deref for Chunk {
6592
impl AsRef<[u8]> for Chunk {
6693
#[inline]
6794
fn as_ref(&self) -> &[u8] {
68-
match self.0 {
69-
Inner::Shared(ref slice) => slice,
70-
}
95+
&self.0.bytes
7196
}
7297
}
7398

@@ -81,7 +106,7 @@ impl fmt::Debug for Chunk {
81106
impl Default for Chunk {
82107
#[inline]
83108
fn default() -> Chunk {
84-
Chunk(Inner::Shared(Bytes::new()))
109+
Chunk::from(Bytes::new())
85110
}
86111
}
87112

@@ -91,17 +116,13 @@ impl IntoIterator for Chunk {
91116

92117
#[inline]
93118
fn into_iter(self) -> Self::IntoIter {
94-
match self.0 {
95-
Inner::Shared(bytes) => bytes.into_iter(),
96-
}
119+
self.0.bytes.into_iter()
97120
}
98121
}
99122

100123
impl Extend<u8> for Chunk {
101124
#[inline]
102125
fn extend<T>(&mut self, iter: T) where T: IntoIterator<Item=u8> {
103-
match self.0 {
104-
Inner::Shared(ref mut bytes) => bytes.extend(iter)
105-
}
126+
self.0.bytes.extend(iter)
106127
}
107128
}

0 commit comments

Comments
 (0)