Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(http_util): Implement multipart/form-data support #2157

Merged
merged 1 commit into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ redis = { version = "0.22", features = [
], optional = true }
reqsign = { version = "0.9.1", default-features = false, optional = true }
reqwest = { version = "0.11.13", features = [
"multipart",
"stream",
], default-features = false }
rocksdb = { version = "0.20.1", default-features = false, optional = true }
Expand Down
16 changes: 3 additions & 13 deletions core/src/raw/http_util/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::Error;
use crate::ErrorKind;
use crate::Result;

use super::multipart::Multipart;

/// Body used in async HTTP requests.
#[derive(Default)]
pub enum AsyncBody {
Expand All @@ -45,19 +47,7 @@ pub enum AsyncBody {
///
/// If input with this field, we will goto the internal multipart
/// handle logic.
Multipart(String, Bytes),
}

impl From<AsyncBody> for reqwest::Body {
fn from(v: AsyncBody) -> Self {
match v {
AsyncBody::Empty => reqwest::Body::from(""),
AsyncBody::Bytes(bs) => reqwest::Body::from(bs),
AsyncBody::Multipart(_, _) => {
unreachable!("reqwest multipart should not be constructed by body")
}
}
}
Multipart(Multipart),
}

type BytesStream = Box<dyn Stream<Item = Result<Bytes>> + Send + Sync + Unpin>;
Expand Down
26 changes: 18 additions & 8 deletions core/src/raw/http_util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::mem;
use std::str::FromStr;

use futures::TryStreamExt;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
use http::Request;
use http::Response;
use reqwest::redirect::Policy;
Expand Down Expand Up @@ -94,14 +96,22 @@ impl HttpClient {
.version(parts.version)
.headers(parts.headers);

req_builder = if let AsyncBody::Multipart(field, r) = body {
let mut form = reqwest::multipart::Form::new();
let part = reqwest::multipart::Part::stream(AsyncBody::Bytes(r));
form = form.part(field, part);

req_builder.multipart(form)
} else {
req_builder.body(body)
req_builder = match body {
AsyncBody::Empty => req_builder.body(reqwest::Body::from("")),
AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)),
AsyncBody::Multipart(mp) => {
let (boundary, bs) = mp.build();

// Insert content type with correct boundary.
req_builder = req_builder.header(
CONTENT_TYPE,
format!("multipart/form-data; boundary={boundary}").as_str(),
);
// Insert content length with calculated size.
req_builder = req_builder.header(CONTENT_LENGTH, bs.len());

req_builder.body(reqwest::Body::from(bs))
}
};

let mut resp = req_builder.send().await.map_err(|err| {
Expand Down
4 changes: 4 additions & 0 deletions core/src/raw/http_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,7 @@ pub use bytes_range::BytesRange;

mod bytes_content_range;
pub use bytes_content_range::BytesContentRange;

mod multipart;
pub use multipart::Multipart;
pub use multipart::Part;
245 changes: 245 additions & 0 deletions core/src/raw/http_util/multipart.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use bytes::{Bytes, BytesMut};
use http::{header::CONTENT_DISPOSITION, HeaderMap, HeaderName, HeaderValue};

/// Multipart is a builder for multipart/form-data.
#[derive(Debug)]
pub struct Multipart {
boundary: String,
parts: Vec<Part>,
}

impl Default for Multipart {
fn default() -> Self {
Self::new()
}
}

impl Multipart {
/// Create a new multipart with random boundary.
pub fn new() -> Self {
Multipart {
boundary: uuid::Uuid::new_v4().to_string(),
parts: Vec::default(),
}
}

/// Set the boundary with given string.
#[cfg(test)]
fn with_boundary(mut self, boundary: &str) -> Self {
self.boundary = boundary.to_string();
self
}

/// Insert a part into multipart.
pub fn part(mut self, part: Part) -> Self {
self.parts.push(part);
self
}

pub(crate) fn build(self) -> (String, Bytes) {
let mut bs = BytesMut::new();

// Write headers.
for v in self.parts {
// Write the first boundary
bs.extend_from_slice(b"--");
bs.extend_from_slice(self.boundary.as_bytes());
bs.extend_from_slice(b"\r\n");

bs.extend_from_slice(&v.build());
}

// Write the last boundary
bs.extend_from_slice(b"--");
bs.extend_from_slice(self.boundary.as_bytes());
bs.extend_from_slice(b"--");
bs.extend_from_slice(b"\r\n");

(self.boundary, bs.freeze())
}
}

/// Part is a builder for multipart/form-data part.
#[derive(Debug)]
pub struct Part {
headers: HeaderMap,
content: Bytes,
}

impl Part {
/// Create a new part builder
///
/// # Panics
///
/// Input name must be percent encoded.
pub fn new(name: &str) -> Self {
// Insert content disposition header for part.
let mut headers = HeaderMap::new();
headers.insert(
CONTENT_DISPOSITION,
format!("form-data; name=\"{}\"", name).parse().unwrap(),
);

Self {
headers,
content: Bytes::new(),
}
}

/// Insert a header into part.
pub fn header(mut self, key: HeaderName, value: HeaderValue) -> Self {
self.headers.insert(key, value);
self
}

/// Set the content for this part.
pub fn content(mut self, content: impl Into<Bytes>) -> Self {
self.content = content.into();
self
}

pub(crate) fn build(self) -> Bytes {
let mut bs = BytesMut::new();

// Write headers.
for (k, v) in self.headers.iter() {
bs.extend_from_slice(k.as_str().as_bytes());
bs.extend_from_slice(b": ");
bs.extend_from_slice(v.as_bytes());
bs.extend_from_slice(b"\r\n");
}

// Write content.
bs.extend_from_slice(b"\r\n");
bs.extend_from_slice(&self.content);
bs.extend_from_slice(b"\r\n");

bs.freeze()
}
}

#[cfg(test)]
mod tests {
use super::*;

use http::header::CONTENT_TYPE;
use pretty_assertions::assert_eq;

#[test]
fn test_multipart_basic() {
let multipart = Multipart::new()
.part(Part::new("foo").content(Bytes::from("bar")))
.part(Part::new("hello").content(Bytes::from("world")));

let (boundary, body) = multipart.build();

let expected = format!(
"--{boundary}\r\n\
content-disposition: form-data; name=\"foo\"\r\n\
\r\n\
bar\r\n\
--{boundary}\r\n\
content-disposition: form-data; name=\"hello\"\r\n\
\r\n\
world\r\n\
--{boundary}--\r\n",
);

assert_eq!(expected, String::from_utf8(body.to_vec()).unwrap());
}

/// This test is inspired from <https://docs.aws.amazon.com/AmazonS3/latest/userguide/HTTPPOSTExamples.html>
#[test]
fn test_multipart_s3_form_upload() {
let multipart = Multipart::new()
.with_boundary("9431149156168")
.part(Part::new("key").content("user/eric/MyPicture.jpg"))
.part(Part::new("acl").content("public-read"))
.part(Part::new("success_action_redirect").content(
"https://awsexamplebucket1.s3.us-west-1.amazonaws.com/successful_upload.html",
))
.part(Part::new("Content-Type").content("image/jpeg"))
.part(Part::new("x-amz-meta-uuid").content("14365123651274"))
.part(Part::new("x-amz-meta-tag").content("Some,Tag,For,Picture"))
.part(Part::new("AWSAccessKeyId").content("AKIAIOSFODNN7EXAMPLE"))
.part(Part::new("Policy").content("eyAiZXhwaXJhdGlvbiI6ICIyMDA3LTEyLTAxVDEyOjAwOjAwLjAwMFoiLAogICJjb25kaXRpb25zIjogWwogICAgeyJidWNrZXQiOiAiam9obnNtaXRoIn0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiRrZXkiLCAidXNlci9lcmljLyJdLAogICAgeyJhY2wiOiAicHVibGljLXJlYWQifSwKICAgIHsic3VjY2Vzc19hY3Rpb25fcmVkaXJlY3QiOiAiaHR0cDovL2pvaG5zbWl0aC5zMy5hbWF6b25hd3MuY29tL3N1Y2Nlc3NmdWxfdXBsb2FkLmh0bWwifSwKICAgIFsic3RhcnRzLXdpdGgiLCAiJENvbnRlbnQtVHlwZSIsICJpbWFnZS8iXSwKICAgIHsieC1hbXotbWV0YS11dWlkIjogIjE0MzY1MTIzNjUxMjc0In0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiR4LWFtei1tZXRhLXRhZyIsICIiXQogIF0KfQo="))
.part(Part::new("Signature").content("0RavWzkygo6QX9caELEqKi9kDbU="))
.part(Part::new("file").header(CONTENT_TYPE, "image/jpeg".parse().unwrap()).content("...file content...")).part(Part::new("submit").content("Upload to Amazon S3"));

let (_, body) = multipart.build();

let expected = r#"--9431149156168
content-disposition: form-data; name="key"

user/eric/MyPicture.jpg
--9431149156168
content-disposition: form-data; name="acl"

public-read
--9431149156168
content-disposition: form-data; name="success_action_redirect"

https://awsexamplebucket1.s3.us-west-1.amazonaws.com/successful_upload.html
--9431149156168
content-disposition: form-data; name="Content-Type"

image/jpeg
--9431149156168
content-disposition: form-data; name="x-amz-meta-uuid"

14365123651274
--9431149156168
content-disposition: form-data; name="x-amz-meta-tag"

Some,Tag,For,Picture
--9431149156168
content-disposition: form-data; name="AWSAccessKeyId"

AKIAIOSFODNN7EXAMPLE
--9431149156168
content-disposition: form-data; name="Policy"

eyAiZXhwaXJhdGlvbiI6ICIyMDA3LTEyLTAxVDEyOjAwOjAwLjAwMFoiLAogICJjb25kaXRpb25zIjogWwogICAgeyJidWNrZXQiOiAiam9obnNtaXRoIn0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiRrZXkiLCAidXNlci9lcmljLyJdLAogICAgeyJhY2wiOiAicHVibGljLXJlYWQifSwKICAgIHsic3VjY2Vzc19hY3Rpb25fcmVkaXJlY3QiOiAiaHR0cDovL2pvaG5zbWl0aC5zMy5hbWF6b25hd3MuY29tL3N1Y2Nlc3NmdWxfdXBsb2FkLmh0bWwifSwKICAgIFsic3RhcnRzLXdpdGgiLCAiJENvbnRlbnQtVHlwZSIsICJpbWFnZS8iXSwKICAgIHsieC1hbXotbWV0YS11dWlkIjogIjE0MzY1MTIzNjUxMjc0In0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiR4LWFtei1tZXRhLXRhZyIsICIiXQogIF0KfQo=
--9431149156168
content-disposition: form-data; name="Signature"

0RavWzkygo6QX9caELEqKi9kDbU=
--9431149156168
content-disposition: form-data; name="file"
content-type: image/jpeg

...file content...
--9431149156168
content-disposition: form-data; name="submit"

Upload to Amazon S3
--9431149156168--
"#;

assert_eq!(
expected,
// Rust can't represent `\r` in a string literal, so we
// replace `\r\n` with `\n` for comparison
String::from_utf8(body.to_vec())
.unwrap()
.replace("\r\n", "\n")
);
}
}
5 changes: 4 additions & 1 deletion core/src/services/ipmfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ impl oio::Write for IpmfsWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let resp = self
.backend
.ipmfs_write(&self.path, AsyncBody::Multipart("data".to_string(), bs))
.ipmfs_write(
&self.path,
AsyncBody::Multipart(Multipart::new().part(Part::new("data").content(bs))),
)
.await?;

let status = resp.status();
Expand Down