Skip to content

Commit

Permalink
feat: Adopt quick_xml to parse xml (#164)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo authored Mar 21, 2022
1 parent 8227a5d commit 1674f77
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 112 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ log = "0.4"
metrics = "0.18"
once_cell = "1"
pin-project = "1"
quick-xml = { version = "0.22.0", features = ["serialize"] }
reqsign = "0.0.1"
reqwest = { version = "0.11", features = ["stream"] }
roxmltree = "0.14"
serde = { version = "1.0.136", features = ["derive"] }
thiserror = "1"
time = "0.3.7"
tokio = { version = "1.17", features = ["full"] }
tower = "0.4"
time = "0.3.7"

[dev-dependencies]
anyhow = "1.0"
Expand Down
172 changes: 62 additions & 110 deletions src/services/s3/object_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::anyhow;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use bstr::ByteSlice;
use bytes::BufMut;
use anyhow::anyhow;
use bytes::{Buf, BufMut};
use futures::future::BoxFuture;
use futures::ready;
use futures::StreamExt;
use log::debug;
use quick_xml::de;
use serde::Deserialize;

use super::Backend;
use crate::error::{Error, Kind, Result};
Expand All @@ -43,7 +44,7 @@ pub struct S3ObjectStream {
enum State {
Idle,
Sending(BoxFuture<'static, Result<bytes::Bytes>>),
Listing((ListOutput, usize, usize)),
Listing((Output, usize, usize)),
}

impl S3ObjectStream {
Expand Down Expand Up @@ -102,18 +103,37 @@ impl futures::Stream for S3ObjectStream {
self.poll_next(cx)
}
State::Sending(fut) => {
let output = ListOutput::parse(ready!(Pin::new(fut).poll(cx))?)?;

self.done = !output.is_truncated;
self.token = output.next_continuation_token.clone();
let bs = ready!(Pin::new(fut).poll(cx))?;
let output: Output = de::from_reader(bs.reader()).map_err(|e| Error::Object {
kind: Kind::Unexpected,
op: "list",
path: self.path.clone(),
source: anyhow!("deserialize list_bucket output: {:?}", e),
})?;

// Try our best to check whether this list is done.
//
// - Check `is_truncated`
// - Check `next_continuation_token`
// - Check the length of `common_prefixes` and `contents` (very rarely case)
self.done = if let Some(is_truncated) = output.is_truncated {
!is_truncated
} else if let Some(next_continuation_token) =
output.next_continuation_token.as_ref()
{
next_continuation_token.is_empty()
} else {
output.common_prefixes.is_empty() && output.contents.is_empty()
};
self.token = output.next_continuation_token.clone().unwrap_or_default();
self.state = State::Listing((output, 0, 0));
self.poll_next(cx)
}
State::Listing((output, common_prefixes_idx, objects_idx)) => {
let prefixes = &output.common_prefixes;
if *common_prefixes_idx < prefixes.len() {
*common_prefixes_idx += 1;
let prefix = &prefixes[*common_prefixes_idx - 1];
let prefix = &prefixes[*common_prefixes_idx - 1].prefix;

let mut o =
Object::new(Arc::new(backend.clone()), &backend.get_rel_path(prefix));
Expand Down Expand Up @@ -165,107 +185,35 @@ impl futures::Stream for S3ObjectStream {
}
}

#[derive(Default, Debug)]
struct ListOutput {
is_truncated: bool,
next_continuation_token: String,
common_prefixes: Vec<String>,
contents: Vec<ListOutputContent>,
/// Output of ListBucket/ListObjects.
///
/// ## Note
///
/// Use `Option` in `is_truncated` and `next_continuation_token` to make
/// the behavior more clear so that we can be compatible to more s3 services.
///
/// And enable `serde(default)` so that we can keep going even when some field
/// is not exist.
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
struct Output {
is_truncated: Option<bool>,
next_continuation_token: Option<String>,
common_prefixes: Vec<OutputCommonPrefix>,
contents: Vec<OutputContent>,
}

#[derive(Default, Debug, Eq, PartialEq)]
struct ListOutputContent {
#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct OutputContent {
key: String,
size: u64,
}

impl ListOutput {
// TODO: we need a better XML deserialize.
fn parse(bs: bytes::Bytes) -> Result<ListOutput> {
let root = roxmltree::Document::parse(
bs.as_bytes().to_str().expect("content must be valid utf-8"),
)
.map_err(|e| Error::Unexpected(anyhow::Error::from(e)))?;

let mut output = ListOutput::default();

// IsTruncated
if let Some(n) = root
.descendants()
.find(|n| n.tag_name().name() == "IsTruncated")
{
output.is_truncated = n
.text()
.unwrap_or("false")
.parse::<bool>()
.map_err(|e| invalid_list_object_response(&e.to_string()))?;
}

// NextContinuationToken
if let Some(n) = root
.descendants()
.find(|n| n.tag_name().name() == "NextContinuationToken")
{
output.next_continuation_token = n.text().unwrap_or_default().to_string();
}

// CommonPrefixes
for item in root
.descendants()
.filter(|v| v.tag_name().name() == "CommonPrefixes")
{
output.common_prefixes.push(
item.children()
.find(|v| v.tag_name().name() == "Prefix")
.ok_or_else(|| invalid_list_object_response("Prefix is not found"))?
.text()
.ok_or_else(|| invalid_list_object_response("Prefix is empty"))?
.to_string(),
)
}

// Contents
for item in root
.descendants()
.filter(|v| v.tag_name().name() == "Contents")
{
let mut content = ListOutputContent::default();

// Key
let n = item
.children()
.find(|n| n.tag_name().name() == "Key")
.ok_or_else(|| invalid_list_object_response("Key is not found"))?;
content.key = n
.text()
.ok_or_else(|| invalid_list_object_response("Key is empty"))?
.to_string();

// Size
let n = item
.children()
.find(|n| n.tag_name().name() == "Size")
.ok_or_else(|| invalid_list_object_response("Size is not found"))?;
content.size = n
.text()
.ok_or_else(|| invalid_list_object_response("Size is empty"))?
.parse::<u64>()
.map_err(|e| invalid_list_object_response(&e.to_string()))?;

output.contents.push(content)
}

Ok(output)
}
}

fn invalid_list_object_response(cause: &str) -> Error {
Error::Object {
kind: Kind::Unexpected,
op: "list",
path: "".to_string(),
source: anyhow!("invalid list object response: {}", cause),
}
#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct OutputCommonPrefix {
prefix: String,
}

#[cfg(test)]
Expand Down Expand Up @@ -306,22 +254,26 @@ mod tests {
</ListBucketResult>"#,
);

let out = ListOutput::parse(bs).expect("must success");
let out: Output = de::from_reader(bs.reader()).expect("must success");

assert!(!out.is_truncated);
assert!(out.next_continuation_token.is_empty());
println!("{:?}", out);
assert!(!out.is_truncated.unwrap());
assert!(out.next_continuation_token.is_none());
assert_eq!(
out.common_prefixes,
out.common_prefixes
.iter()
.map(|v| v.prefix.clone())
.collect::<Vec<String>>(),
vec!["photos/2006/February/", "photos/2006/January/"]
);
assert_eq!(
out.contents,
vec![
ListOutputContent {
OutputContent {
key: "photos/2006".to_string(),
size: 56
},
ListOutputContent {
OutputContent {
key: "photos/2007".to_string(),
size: 100
}
Expand Down
2 changes: 1 addition & 1 deletion tests/behavior/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl BehaviorTest {
);

// Step 5: List this dir, we should get this file.
let mut obs = self.op.objects("").map(|o| o.expect("list object: {}"));
let mut obs = self.op.objects("").map(|o| o.expect("list object"));
let mut found = false;
while let Some(o) = obs.next().await {
let meta = o.metadata().await?;
Expand Down

0 comments on commit 1674f77

Please # to comment.