Skip to content

Commit fac165e

Browse files
michaelbeaumontseanmonstar
authored andcommitted
Add server support for push (#327)
Closes #291, closes #185
1 parent 0527f5b commit fac165e

File tree

10 files changed

+736
-135
lines changed

10 files changed

+736
-135
lines changed

src/frame/headers.rs

+84-32
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::frame::{Error, Frame, Head, Kind};
33
use crate::hpack;
44

55
use http::header::{self, HeaderName, HeaderValue};
6-
use http::{uri, HeaderMap, Method, StatusCode, Uri};
6+
use http::{uri, HeaderMap, Method, Request, StatusCode, Uri};
77

88
use bytes::{Bytes, BytesMut};
99
use string::String;
@@ -300,9 +300,92 @@ impl fmt::Debug for Headers {
300300
}
301301
}
302302

303+
// ===== util =====
304+
305+
pub fn parse_u64(src: &[u8]) -> Result<u64, ()> {
306+
if src.len() > 19 {
307+
// At danger for overflow...
308+
return Err(());
309+
}
310+
311+
let mut ret = 0;
312+
313+
for &d in src {
314+
if d < b'0' || d > b'9' {
315+
return Err(());
316+
}
317+
318+
ret *= 10;
319+
ret += (d - b'0') as u64;
320+
}
321+
322+
Ok(ret)
323+
}
324+
303325
// ===== impl PushPromise =====
304326

327+
#[derive(Debug)]
328+
pub enum PushPromiseHeaderError {
329+
InvalidContentLength(Result<u64, ()>),
330+
NotSafeAndCacheable,
331+
}
332+
305333
impl PushPromise {
334+
pub fn new(
335+
stream_id: StreamId,
336+
promised_id: StreamId,
337+
pseudo: Pseudo,
338+
fields: HeaderMap,
339+
) -> Self {
340+
PushPromise {
341+
flags: PushPromiseFlag::default(),
342+
header_block: HeaderBlock {
343+
fields,
344+
is_over_size: false,
345+
pseudo,
346+
},
347+
promised_id,
348+
stream_id,
349+
}
350+
}
351+
352+
pub fn validate_request(req: &Request<()>) -> Result<(), PushPromiseHeaderError> {
353+
use PushPromiseHeaderError::*;
354+
// The spec has some requirements for promised request headers
355+
// [https://httpwg.org/specs/rfc7540.html#PushRequests]
356+
357+
// A promised request "that indicates the presence of a request body
358+
// MUST reset the promised stream with a stream error"
359+
if let Some(content_length) = req.headers().get(header::CONTENT_LENGTH) {
360+
let parsed_length = parse_u64(content_length.as_bytes());
361+
if parsed_length != Ok(0) {
362+
return Err(InvalidContentLength(parsed_length));
363+
}
364+
}
365+
// "The server MUST include a method in the :method pseudo-header field
366+
// that is safe and cacheable"
367+
if !Self::safe_and_cacheable(req.method()) {
368+
return Err(NotSafeAndCacheable);
369+
}
370+
371+
Ok(())
372+
}
373+
374+
fn safe_and_cacheable(method: &Method) -> bool {
375+
// Cacheable: https://httpwg.org/specs/rfc7231.html#cacheable.methods
376+
// Safe: https://httpwg.org/specs/rfc7231.html#safe.methods
377+
return method == Method::GET || method == Method::HEAD;
378+
}
379+
380+
pub fn fields(&self) -> &HeaderMap {
381+
&self.header_block.fields
382+
}
383+
384+
#[cfg(feature = "unstable")]
385+
pub fn into_fields(self) -> HeaderMap {
386+
self.header_block.fields
387+
}
388+
306389
/// Loads the push promise frame but doesn't actually do HPACK decoding.
307390
///
308391
/// HPACK decoding is done in the `load_hpack` step.
@@ -401,44 +484,13 @@ impl PushPromise {
401484
fn head(&self) -> Head {
402485
Head::new(Kind::PushPromise, self.flags.into(), self.stream_id)
403486
}
404-
}
405487

406-
impl PushPromise {
407488
/// Consume `self`, returning the parts of the frame
408489
pub fn into_parts(self) -> (Pseudo, HeaderMap) {
409490
(self.header_block.pseudo, self.header_block.fields)
410491
}
411492
}
412493

413-
#[cfg(feature = "unstable")]
414-
impl PushPromise {
415-
pub fn new(
416-
stream_id: StreamId,
417-
promised_id: StreamId,
418-
pseudo: Pseudo,
419-
fields: HeaderMap,
420-
) -> Self {
421-
PushPromise {
422-
flags: PushPromiseFlag::default(),
423-
header_block: HeaderBlock {
424-
fields,
425-
is_over_size: false,
426-
pseudo,
427-
},
428-
promised_id,
429-
stream_id,
430-
}
431-
}
432-
433-
pub fn fields(&self) -> &HeaderMap {
434-
&self.header_block.fields
435-
}
436-
437-
pub fn into_fields(self) -> HeaderMap {
438-
self.header_block.fields
439-
}
440-
}
441-
442494
impl<T> From<PushPromise> for Frame<T> {
443495
fn from(src: PushPromise) -> Self {
444496
Frame::PushPromise(src)

src/frame/mod.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ mod window_update;
4242
pub use self::data::Data;
4343
pub use self::go_away::GoAway;
4444
pub use self::head::{Head, Kind};
45-
pub use self::headers::{Continuation, Headers, Pseudo, PushPromise};
45+
pub use self::headers::{
46+
parse_u64, Continuation, Headers, Pseudo, PushPromise, PushPromiseHeaderError,
47+
};
4648
pub use self::ping::Ping;
4749
pub use self::priority::{Priority, StreamDependency};
4850
pub use self::reason::Reason;

src/proto/streams/prioritize.rs

+19-13
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl Prioritize {
111111

112112
pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
113113
// If the stream is waiting to be opened, nothing more to do.
114-
if !stream.is_pending_open {
114+
if stream.is_send_ready() {
115115
log::trace!("schedule_send; {:?}", stream.id);
116116
// Queue the stream
117117
self.pending_send.push(stream);
@@ -445,19 +445,9 @@ impl Prioritize {
445445
self.pending_capacity.push(stream);
446446
}
447447

448-
// If data is buffered and the stream is not pending open, then
448+
// If data is buffered and the stream is send ready, then
449449
// schedule the stream for execution
450-
//
451-
// Why do we not push into pending_send when the stream is in pending_open?
452-
//
453-
// We allow users to call send_request() which schedules a stream to be pending_open
454-
// if there is no room according to the concurrency limit (max_send_streams), and we
455-
// also allow data to be buffered for send with send_data() if there is no capacity for
456-
// the stream to send the data, which attempts to place the stream in pending_send.
457-
// If the stream is not open, we don't want the stream to be scheduled for
458-
// execution (pending_send). Note that if the stream is in pending_open, it will be
459-
// pushed to pending_send when there is room for an open stream.
460-
if stream.buffered_send_data > 0 && !stream.is_pending_open {
450+
if stream.buffered_send_data > 0 && stream.is_send_ready() {
461451
// TODO: This assertion isn't *exactly* correct. There can still be
462452
// buffered send data while the stream's pending send queue is
463453
// empty. This can happen when a large data frame is in the process
@@ -766,6 +756,22 @@ impl Prioritize {
766756
stream: stream.key(),
767757
}))
768758
}
759+
Some(Frame::PushPromise(pp)) => {
760+
let mut pushed =
761+
stream.store_mut().find_mut(&pp.promised_id()).unwrap();
762+
pushed.is_pending_push = false;
763+
// Transition stream from pending_push to pending_open
764+
// if possible
765+
if !pushed.pending_send.is_empty() {
766+
if counts.can_inc_num_send_streams() {
767+
counts.inc_num_send_streams(&mut pushed);
768+
self.pending_send.push(&mut pushed);
769+
} else {
770+
self.queue_open(&mut pushed);
771+
}
772+
}
773+
Frame::PushPromise(pp)
774+
}
769775
Some(frame) => frame.map(|_| {
770776
unreachable!(
771777
"Frame::map closure will only be called \

src/proto/streams/recv.rs

+20-61
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use super::*;
22
use crate::codec::{RecvError, UserError};
3-
use crate::frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE};
3+
use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
44
use crate::{frame, proto};
55
use std::task::Context;
66

7-
use http::{HeaderMap, Method, Request, Response};
7+
use http::{HeaderMap, Request, Response};
88

99
use std::io;
1010
use std::task::{Poll, Waker};
@@ -178,7 +178,7 @@ impl Recv {
178178
use http::header;
179179

180180
if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
181-
let content_length = match parse_u64(content_length.as_bytes()) {
181+
let content_length = match frame::parse_u64(content_length.as_bytes()) {
182182
Ok(v) => v,
183183
Err(()) => {
184184
proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
@@ -632,44 +632,31 @@ impl Recv {
632632
}
633633

634634
let promised_id = frame.promised_id();
635-
use http::header;
636635
let (pseudo, fields) = frame.into_parts();
637636
let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
638-
// The spec has some requirements for promised request headers
639-
// [https://httpwg.org/specs/rfc7540.html#PushRequests]
640-
641-
// A promised request "that indicates the presence of a request body
642-
// MUST reset the promised stream with a stream error"
643-
if let Some(content_length) = req.headers().get(header::CONTENT_LENGTH) {
644-
match parse_u64(content_length.as_bytes()) {
645-
Ok(0) => {}
646-
otherwise => {
647-
proto_err!(stream:
648-
"recv_push_promise; promised request has content-length {:?}; promised_id={:?}",
649-
otherwise,
650-
promised_id,
651-
);
652-
return Err(RecvError::Stream {
653-
id: promised_id,
654-
reason: Reason::PROTOCOL_ERROR,
655-
});
656-
}
637+
638+
if let Err(e) = frame::PushPromise::validate_request(&req) {
639+
use PushPromiseHeaderError::*;
640+
match e {
641+
NotSafeAndCacheable => proto_err!(
642+
stream:
643+
"recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
644+
req.method(),
645+
promised_id,
646+
),
647+
InvalidContentLength(e) => proto_err!(
648+
stream:
649+
"recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
650+
e,
651+
promised_id,
652+
),
657653
}
658-
}
659-
// "The server MUST include a method in the :method pseudo-header field
660-
// that is safe and cacheable"
661-
if !Self::safe_and_cacheable(req.method()) {
662-
proto_err!(
663-
stream:
664-
"recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
665-
req.method(),
666-
promised_id,
667-
);
668654
return Err(RecvError::Stream {
669655
id: promised_id,
670656
reason: Reason::PROTOCOL_ERROR,
671657
});
672658
}
659+
673660
use super::peer::PollMessage::*;
674661
stream
675662
.pending_recv
@@ -678,12 +665,6 @@ impl Recv {
678665
Ok(())
679666
}
680667

681-
fn safe_and_cacheable(method: &Method) -> bool {
682-
// Cacheable: https://httpwg.org/specs/rfc7231.html#cacheable.methods
683-
// Safe: https://httpwg.org/specs/rfc7231.html#safe.methods
684-
method == Method::GET || method == Method::HEAD
685-
}
686-
687668
/// Ensures that `id` is not in the `Idle` state.
688669
pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
689670
if let Ok(next) = self.next_stream_id {
@@ -1057,25 +1038,3 @@ impl<T> From<RecvError> for RecvHeaderBlockError<T> {
10571038
RecvHeaderBlockError::State(err)
10581039
}
10591040
}
1060-
1061-
// ===== util =====
1062-
1063-
fn parse_u64(src: &[u8]) -> Result<u64, ()> {
1064-
if src.len() > 19 {
1065-
// At danger for overflow...
1066-
return Err(());
1067-
}
1068-
1069-
let mut ret = 0;
1070-
1071-
for &d in src {
1072-
if d < b'0' || d > b'9' {
1073-
return Err(());
1074-
}
1075-
1076-
ret *= 10;
1077-
ret += u64::from(d - b'0');
1078-
}
1079-
1080-
Ok(ret)
1081-
}

0 commit comments

Comments
 (0)