Skip to content

Commit 4dedf74

Browse files
committed
refactor(http): move upgrade state from body to head
Move state required for protocol upgrades to head representations, instead of associating it with the body. Closes #2340. Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
1 parent b4e2433 commit 4dedf74

File tree

5 files changed

+33
-37
lines changed

5 files changed

+33
-37
lines changed

src/body/body.rs

+2-27
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::common::{task, watch, Pin, Poll};
2323
use crate::common::{Future, Never};
2424
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
2525
use crate::proto::h2::ping;
26-
use crate::upgrade::OnUpgrade;
2726

2827
type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
2928

@@ -70,7 +69,6 @@ struct Extra {
7069
/// a brand new connection, since the pool didn't know about the idle
7170
/// connection yet.
7271
delayed_eof: Option<DelayEof>,
73-
on_upgrade: OnUpgrade,
7472
}
7573

7674
#[cfg(any(feature = "http1", feature = "http2"))]
@@ -187,17 +185,6 @@ impl Body {
187185
Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
188186
}
189187

190-
// TODO: Eventually the pending upgrade should be stored in the
191-
// `Extensions`, and all these pieces can be removed. In v0.14, we made
192-
// the breaking changes, so now this TODO can be done without breakage.
193-
pub(crate) fn take_upgrade(&mut self) -> OnUpgrade {
194-
if let Some(ref mut extra) = self.extra {
195-
std::mem::replace(&mut extra.on_upgrade, OnUpgrade::none())
196-
} else {
197-
OnUpgrade::none()
198-
}
199-
}
200-
201188
fn new(kind: Kind) -> Body {
202189
Body { kind, extra: None }
203190
}
@@ -217,14 +204,6 @@ impl Body {
217204
body
218205
}
219206

220-
#[cfg(feature = "http1")]
221-
pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) {
222-
debug_assert!(!upgrade.is_none(), "set_on_upgrade with empty upgrade");
223-
let extra = self.extra_mut();
224-
debug_assert!(extra.on_upgrade.is_none(), "set_on_upgrade twice");
225-
extra.on_upgrade = upgrade;
226-
}
227-
228207
#[cfg(any(feature = "http1", feature = "http2"))]
229208
#[cfg(feature = "client")]
230209
pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
@@ -239,12 +218,8 @@ impl Body {
239218

240219
#[cfg(any(feature = "http1", feature = "http2"))]
241220
fn extra_mut(&mut self) -> &mut Extra {
242-
self.extra.get_or_insert_with(|| {
243-
Box::new(Extra {
244-
delayed_eof: None,
245-
on_upgrade: OnUpgrade::none(),
246-
})
247-
})
221+
self.extra
222+
.get_or_insert_with(|| Box::new(Extra { delayed_eof: None }))
248223
}
249224

250225
fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {

src/proto/h1/dispatch.rs

+11-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::common::{task, Future, Pin, Poll, Unpin};
1010
use crate::proto::{
1111
BodyLength, Conn, Dispatched, MessageHead, RequestHead,
1212
};
13+
use crate::upgrade::OnUpgrade;
1314

1415
pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
1516
conn: Conn<I, Bs::Data, T>,
@@ -243,8 +244,8 @@ where
243244
}
244245
// dispatch is ready for a message, try to read one
245246
match ready!(self.conn.poll_read_head(cx)) {
246-
Some(Ok((head, body_len, wants))) => {
247-
let mut body = match body_len {
247+
Some(Ok((mut head, body_len, wants))) => {
248+
let body = match body_len {
248249
DecodedLength::ZERO => Body::empty(),
249250
other => {
250251
let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT));
@@ -253,7 +254,10 @@ where
253254
}
254255
};
255256
if wants.contains(Wants::UPGRADE) {
256-
body.set_on_upgrade(self.conn.on_upgrade());
257+
let upgrade = self.conn.on_upgrade();
258+
debug_assert!(!upgrade.is_none(), "empty upgrade");
259+
debug_assert!(head.extensions.get::<OnUpgrade>().is_none(), "OnUpgrade already set");
260+
head.extensions.insert(upgrade);
257261
}
258262
self.dispatch.recv_msg(Ok((head, body)))?;
259263
Poll::Ready(Ok(()))
@@ -488,6 +492,7 @@ cfg_server! {
488492
version: parts.version,
489493
subject: parts.status,
490494
headers: parts.headers,
495+
extensions: http::Extensions::default(),
491496
};
492497
Poll::Ready(Some(Ok((head, body))))
493498
} else {
@@ -506,6 +511,7 @@ cfg_server! {
506511
*req.uri_mut() = msg.subject.1;
507512
*req.headers_mut() = msg.headers;
508513
*req.version_mut() = msg.version;
514+
*req.extensions_mut() = msg.extensions;
509515
let fut = self.service.call(req);
510516
self.in_flight.set(Some(fut));
511517
Ok(())
@@ -570,6 +576,7 @@ cfg_client! {
570576
version: parts.version,
571577
subject: crate::proto::RequestLine(parts.method, parts.uri),
572578
headers: parts.headers,
579+
extensions: http::Extensions::default(),
573580
};
574581
*this.callback = Some(cb);
575582
Poll::Ready(Some(Ok((head, body))))
@@ -594,6 +601,7 @@ cfg_client! {
594601
*res.status_mut() = msg.subject;
595602
*res.headers_mut() = msg.headers;
596603
*res.version_mut() = msg.version;
604+
*res.extensions_mut() = msg.extensions;
597605
cb.send(Ok(res));
598606
Ok(())
599607
} else {

src/proto/h1/role.rs

+2
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ impl Http1Transaction for Server {
270270
version,
271271
subject,
272272
headers,
273+
extensions: http::Extensions::default(),
273274
},
274275
decode: decoder,
275276
expect_continue,
@@ -713,6 +714,7 @@ impl Http1Transaction for Client {
713714
version,
714715
subject: status,
715716
headers,
717+
extensions: http::Extensions::default(),
716718
};
717719
if let Some((decode, is_upgrade)) = Client::decoder(&head, ctx.req_method)? {
718720
return Ok(Some(ParsedMessage {

src/proto/mod.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@ cfg_http2! {
1616
}
1717

1818
/// An Incoming Message head. Includes request/status line, and headers.
19-
#[derive(Clone, Debug, Default, PartialEq)]
19+
#[derive(Debug, Default)]
2020
pub struct MessageHead<S> {
2121
/// HTTP version of the message.
2222
pub version: http::Version,
2323
/// Subject (request line or status line) of Incoming message.
2424
pub subject: S,
2525
/// Headers of the Incoming message.
2626
pub headers: http::HeaderMap,
27+
28+
/// Extensions.
29+
extensions: http::Extensions,
2730
}
2831

2932
/// An incoming request message.

src/upgrade.rs

+14-6
Original file line numberDiff line numberDiff line change
@@ -317,26 +317,34 @@ mod sealed {
317317
}
318318

319319
impl CanUpgrade for http::Request<crate::Body> {
320-
fn on_upgrade(self) -> OnUpgrade {
321-
self.into_body().take_upgrade()
320+
fn on_upgrade(mut self) -> OnUpgrade {
321+
self.extensions_mut()
322+
.remove::<OnUpgrade>()
323+
.unwrap_or_else(OnUpgrade::none)
322324
}
323325
}
324326

325327
impl CanUpgrade for &'_ mut http::Request<crate::Body> {
326328
fn on_upgrade(self) -> OnUpgrade {
327-
self.body_mut().take_upgrade()
329+
self.extensions_mut()
330+
.remove::<OnUpgrade>()
331+
.unwrap_or_else(OnUpgrade::none)
328332
}
329333
}
330334

331335
impl CanUpgrade for http::Response<crate::Body> {
332-
fn on_upgrade(self) -> OnUpgrade {
333-
self.into_body().take_upgrade()
336+
fn on_upgrade(mut self) -> OnUpgrade {
337+
self.extensions_mut()
338+
.remove::<OnUpgrade>()
339+
.unwrap_or_else(OnUpgrade::none)
334340
}
335341
}
336342

337343
impl CanUpgrade for &'_ mut http::Response<crate::Body> {
338344
fn on_upgrade(self) -> OnUpgrade {
339-
self.body_mut().take_upgrade()
345+
self.extensions_mut()
346+
.remove::<OnUpgrade>()
347+
.unwrap_or_else(OnUpgrade::none)
340348
}
341349
}
342350
}

0 commit comments

Comments
 (0)