Skip to content

Commit

Permalink
Remove generic parameter from BodyStream
Browse files Browse the repository at this point in the history
I think `BodyStream` is more useful without being generic over the
request body.

I'm also looking into adding a response body from a stream called
`StreamBody` which will work pretty much opposite to this.
  • Loading branch information
davidpdrsn committed Aug 21, 2021
1 parent fbd43c6 commit 80e5892
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
`tower::make::Shared` ([#229](https://github.com/tokio-rs/axum/pull/229))
- All usage of `tower::BoxError` has been replaced with `axum::BoxError` ([#229](https://github.com/tokio-rs/axum/pull/229))
- `tower::util::Either` no longer implements `IntoResponse` ([#229](https://github.com/tokio-rs/axum/pull/229))
- `extract::BodyStream` is no longer generic over the request body
- These future types have been moved
- `extract::extractor_middleware::ExtractorMiddlewareResponseFuture` moved
to `extract::extractor_middleware::future::ResponseFuture` ([#133](https://github.com/tokio-rs/axum/pull/133))
Expand Down
2 changes: 1 addition & 1 deletion src/extract/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@
//! .route(
//! "/body-stream",
//! // same for `extract::BodyStream`
//! get(|_: extract::BodyStream<MyBody<Body>>| async {}),
//! get(|_: extract::BodyStream| async {}),
//! )
//! .route(
//! // and `Request<_>`
Expand Down
43 changes: 30 additions & 13 deletions src/extract/request_parts.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use super::{rejection::*, take_body, Extension, FromRequest, RequestParts};
use crate::BoxError;
use crate::{BoxError, Error};
use async_trait::async_trait;
use bytes::Bytes;
use futures_util::stream::Stream;
use http::{Extensions, HeaderMap, Method, Request, Uri, Version};
use http_body::Body as HttpBody;
use std::{
convert::Infallible,
fmt,
pin::Pin,
task::{Context, Poll},
};
use sync_wrapper::SyncWrapper;

#[async_trait]
impl<B> FromRequest<B> for Request<B>
Expand Down Expand Up @@ -191,34 +194,48 @@ where
/// ```
///
/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
#[derive(Debug)]
pub struct BodyStream<B = crate::body::Body>(B);
pub struct BodyStream(
SyncWrapper<Pin<Box<dyn http_body::Body<Data = Bytes, Error = Error> + Send + 'static>>>,
);

impl<B> Stream for BodyStream<B>
where
B: http_body::Body + Unpin,
{
type Item = Result<B::Data, B::Error>;
impl Stream for BodyStream {
type Item = Result<Bytes, Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0).poll_data(cx)
Pin::new(self.0.get_mut()).poll_data(cx)
}
}

#[async_trait]
impl<B> FromRequest<B> for BodyStream<B>
impl<B> FromRequest<B> for BodyStream
where
B: http_body::Body + Unpin + Send,
B: HttpBody + Send + 'static,
B::Data: Into<Bytes>,
B::Error: Into<BoxError>,
{
type Rejection = BodyAlreadyExtracted;

async fn from_request(req: &mut RequestParts<B>) -> Result<Self, Self::Rejection> {
let body = take_body(req)?;
let stream = BodyStream(body);
let body = take_body(req)?
.map_data(Into::into)
.map_err(|err| Error::new(err.into()));
let stream = BodyStream(SyncWrapper::new(Box::pin(body)));
Ok(stream)
}
}

#[test]
fn body_stream_traits() {
crate::tests::assert_send::<BodyStream>();
crate::tests::assert_sync::<BodyStream>();
}

impl fmt::Debug for BodyStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("BodyStream").finish()
}
}

/// Extractor that extracts the request body.
///
/// # Example
Expand Down
3 changes: 3 additions & 0 deletions src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,3 +698,6 @@ where

addr
}

pub(crate) fn assert_send<T: Send>() {}
pub(crate) fn assert_sync<T: Sync>() {}

0 comments on commit 80e5892

Please # to comment.