From 8c55de45f50714eb0c7357aa8a4898dfa4dbdaac Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Wed, 20 Sep 2023 14:15:51 +0800 Subject: [PATCH] feat: Introduce FileIO (#53) * feat: Introduce FileIO * Sort * fix * Fix typo * Fix comments * Rename InputStream to FileRead * FileWrite trait --- crates/iceberg/Cargo.toml | 5 + crates/iceberg/src/error.rs | 12 + crates/iceberg/src/io.rs | 456 ++++++++++++++++++++++++++++++++++++ crates/iceberg/src/lib.rs | 1 + 4 files changed, 474 insertions(+) create mode 100644 crates/iceberg/src/io.rs diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index d02573423..007c5c2d2 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -34,9 +34,11 @@ bitvec = "1.0.1" chrono = "0.4" derive_builder = "0.12.0" either = "1" +futures = "0.3" itertools = "0.11" lazy_static = "1" once_cell = "1" +opendal = "0.39" ordered-float = "3.7.0" rust_decimal = "1.31.0" serde = { version = "^1.0", features = ["rc"] } @@ -44,7 +46,10 @@ serde_bytes = "0.11.8" serde_derive = "^1.0" serde_json = "^1.0" serde_repr = "0.1.16" +url = "2" uuid = "1.4.1" [dev-dependencies] pretty_assertions = "1.4.0" +tempdir = "0.3" +tokio = { version = "1", features = ["macros"] } diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 48bdebc53..e4ae576d8 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -295,6 +295,18 @@ define_from_err!( "Failure in conversion with avro" ); +define_from_err!( + opendal::Error, + ErrorKind::Unexpected, + "Failure in doing io operation" +); + +define_from_err!( + url::ParseError, + ErrorKind::DataInvalid, + "Failed to parse url" +); + /// Helper macro to check arguments. /// /// diff --git a/crates/iceberg/src/io.rs b/crates/iceberg/src/io.rs new file mode 100644 index 000000000..9d55aac81 --- /dev/null +++ b/crates/iceberg/src/io.rs @@ -0,0 +1,456 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! File io implementation. +//! +//! # How to build `FileIO` +//! +//! We provided a `FileIOBuilder` to build `FileIO` from scratch. For example: +//! ```rust +//! use iceberg::io::{FileIOBuilder, S3_REGION}; +//! +//! let file_io = FileIOBuilder::new("s3") +//! .with_prop(S3_REGION, "us-east-1") +//! .build() +//! .unwrap(); +//! ``` +//! +//! # How to use `FileIO` +//! +//! Currently `FileIO` provides simple methods for file operations: +//! +//! - `delete`: Delete file. +//! - `is_exist`: Check if file exists. +//! - `new_input`: Create input file for reading. +//! - `new_output`: Create output file for writing. + +use std::{collections::HashMap, sync::Arc}; + +use crate::{error::Result, Error, ErrorKind}; +use futures::{AsyncRead, AsyncSeek, AsyncWrite}; +use once_cell::sync::Lazy; +use opendal::{Operator, Scheme}; +use url::Url; + +/// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3). +/// S3 endopint. +pub const S3_ENDPOINT: &str = "s3.endpoint"; +/// S3 access key id. +pub const S3_ACCESS_KEY_ID: &str = "s3.access-key-id"; +/// S3 secret access key. +pub const S3_SECRET_ACCESS_KEY: &str = "s3.secret-access-key"; +/// S3 region. +pub const S3_REGION: &str = "s3.region"; + +/// A mapping from iceberg s3 configuration key to [`opendal::Operator`] configuration key. +static S3_CONFIG_MAPPING: Lazy> = Lazy::new(|| { + let mut m = HashMap::with_capacity(4); + m.insert(S3_ENDPOINT, "endpoint"); + m.insert(S3_ACCESS_KEY_ID, "access_key_id"); + m.insert(S3_SECRET_ACCESS_KEY, "secret_access_key"); + m.insert(S3_REGION, "region"); + + m +}); + +const DEFAULT_ROOT_PATH: &str = "/"; +/// FileIO implementation, used to manipulate files in underlying storage. +/// +/// # Note +/// +/// All path passed to `FileIO` must be absolute path starting with scheme string used to construct `FileIO`. +/// For example, if you construct `FileIO` with `s3a` scheme, then all path passed to `FileIO` must start with `s3a://`. +#[derive(Clone, Debug)] +pub struct FileIO { + inner: Arc, +} + +/// Builder for [`FileIO`]. +pub struct FileIOBuilder { + /// This is used to infer scheme of operator. + /// + /// If this is `None`, then [`FileIOBuilder::build`](FileIOBuilder::build) will build a local file io. + scheme_str: Option, + /// Arguments for operator. + props: HashMap, +} + +impl FileIOBuilder { + /// Creates a new builder with scheme. + pub fn new(scheme_str: impl ToString) -> Self { + Self { + scheme_str: Some(scheme_str.to_string()), + props: HashMap::default(), + } + } + + /// Creates a new builder for local file io. + pub fn new_fs_io() -> Self { + Self { + scheme_str: None, + props: HashMap::default(), + } + } + + /// Add argument for operator. + pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self { + self.props.insert(key.to_string(), value.to_string()); + self + } + + /// Add argument for operator. + pub fn with_props( + mut self, + args: impl IntoIterator, + ) -> Self { + self.props + .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string()))); + self + } + + /// Builds [`FileIO`]. + pub fn build(self) -> Result { + let storage = Storage::build(self)?; + Ok(FileIO { + inner: Arc::new(storage), + }) + } +} + +impl FileIO { + /// Deletes file. + pub async fn delete(&self, path: impl AsRef) -> Result<()> { + let (op, relative_path) = self.inner.create_operator(&path)?; + Ok(op.delete(relative_path).await?) + } + + /// Check file exists. + pub async fn is_exist(&self, path: impl AsRef) -> Result { + let (op, relative_path) = self.inner.create_operator(&path)?; + Ok(op.is_exist(relative_path).await?) + } + + /// Creates input file. + pub fn new_input(&self, path: impl AsRef) -> Result { + let (op, relative_path) = self.inner.create_operator(&path)?; + let path = path.as_ref().to_string(); + let relative_path_pos = path.len() - relative_path.len(); + Ok(InputFile { + op, + path, + relative_path_pos, + }) + } + + /// Creates output file. + pub fn new_output(&self, path: impl AsRef) -> Result { + let (op, relative_path) = self.inner.create_operator(&path)?; + let path = path.as_ref().to_string(); + let relative_path_pos = path.len() - relative_path.len(); + Ok(OutputFile { + op, + path, + relative_path_pos, + }) + } +} + +/// Input file is used for reading from files. +#[derive(Debug)] +pub struct InputFile { + op: Operator, + // Absolution path of file. + path: String, + // Relative path of file to uri, starts at [`relative_path_pos`] + relative_path_pos: usize, +} + +/// Trait for reading file. +pub trait FileRead: AsyncRead + AsyncSeek {} + +impl FileRead for T where T: AsyncRead + AsyncSeek {} + +impl InputFile { + /// Absolute path to root uri. + pub fn location(&self) -> &str { + &self.path + } + + /// Check if file exists. + pub async fn exists(&self) -> Result { + Ok(self + .op + .is_exist(&self.path[self.relative_path_pos..]) + .await?) + } + + /// Creates [`InputStream`] for reading. + pub async fn reader(&self) -> Result { + Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?) + } +} + +/// Trait for writing file. +pub trait FileWrite: AsyncWrite {} + +impl FileWrite for T where T: AsyncWrite {} + +/// Output file is used for writing to files.. +#[derive(Debug)] +pub struct OutputFile { + op: Operator, + // Absolution path of file. + path: String, + // Relative path of file to uri, starts at [`relative_path_pos`] + relative_path_pos: usize, +} + +impl OutputFile { + /// Relative path to root uri. + pub fn location(&self) -> &str { + &self.path + } + + /// Checks if file exists. + pub async fn exists(&self) -> Result { + Ok(self + .op + .is_exist(&self.path[self.relative_path_pos..]) + .await?) + } + + /// Converts into [`InputFile`]. + pub fn to_input_file(self) -> InputFile { + InputFile { + op: self.op, + path: self.path, + relative_path_pos: self.relative_path_pos, + } + } + + /// Creates output file for writing. + pub async fn writer(&self) -> Result { + Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?) + } +} + +// We introduce this because I don't want to handle unsupported `Scheme` in every method. +#[derive(Debug)] +enum Storage { + LocalFs { + op: Operator, + }, + S3 { + scheme_str: String, + props: HashMap, + }, +} + +impl Storage { + /// Creates operator from path. + /// + /// # Arguments + /// + /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. + /// + /// # Returns + /// + /// The return value consists of two parts: + /// + /// * An [`opendal::Operator`] instance used to operate on file. + /// * Relative path to the root uri of [`opendal::Operator`]. + /// + fn create_operator<'a>(&self, path: &'a impl AsRef) -> Result<(Operator, &'a str)> { + let path = path.as_ref(); + match self { + Storage::LocalFs { op } => { + if let Some(stripped) = path.strip_prefix("file:/") { + Ok((op.clone(), stripped)) + } else { + Ok((op.clone(), &path[1..])) + } + } + Storage::S3 { scheme_str, props } => { + let mut props = props.clone(); + let url = Url::parse(path)?; + let bucket = url.host_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {}, missing bucket", path), + ) + })?; + + props.insert("bucket".to_string(), bucket.to_string()); + + let prefix = format!("{}://{}/", scheme_str, bucket); + if path.starts_with(&prefix) { + Ok((Operator::via_map(Scheme::S3, props)?, &path[prefix.len()..])) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {}, should start with {}", path, prefix), + )) + } + } + } + } + + /// Parse scheme. + fn parse_scheme(scheme: &str) -> Result { + match scheme { + "file" | "" => Ok(Scheme::Fs), + "s3" | "s3a" => Ok(Scheme::S3), + s => Ok(s.parse::()?), + } + } + + /// Convert iceberg config to opendal config. + fn build(file_io_builder: FileIOBuilder) -> Result { + let scheme_str = file_io_builder.scheme_str.unwrap_or("".to_string()); + let scheme = Self::parse_scheme(&scheme_str)?; + let mut new_props = HashMap::default(); + new_props.insert("root".to_string(), DEFAULT_ROOT_PATH.to_string()); + + match scheme { + Scheme::Fs => Ok(Self::LocalFs { + op: Operator::via_map(Scheme::Fs, new_props)?, + }), + Scheme::S3 => { + for prop in file_io_builder.props { + if let Some(op_key) = S3_CONFIG_MAPPING.get(prop.0.as_str()) { + new_props.insert(op_key.to_string(), prop.1); + } + } + + Ok(Self::S3 { + scheme_str, + props: new_props, + }) + } + _ => Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("Constructing file io from scheme: {scheme} not supported now",), + )), + } + } +} + +#[cfg(test)] +mod tests { + + use std::io::Write; + + use std::{fs::File, path::Path}; + + use futures::io::AllowStdIo; + use futures::{AsyncReadExt, AsyncWriteExt}; + + use tempdir::TempDir; + + use super::{FileIO, FileIOBuilder}; + + fn create_local_file_io() -> FileIO { + FileIOBuilder::new_fs_io().build().unwrap() + } + + fn write_to_file>(s: &str, path: P) { + let mut f = File::create(path).unwrap(); + write!(f, "{s}").unwrap(); + } + + async fn read_from_file>(path: P) -> String { + let mut f = AllowStdIo::new(File::open(path).unwrap()); + let mut s = String::new(); + f.read_to_string(&mut s).await.unwrap(); + s + } + + #[tokio::test] + async fn test_local_input_file() { + let tmp_dir = TempDir::new("test").unwrap(); + + let file_name = "a.txt"; + let content = "Iceberg loves rust."; + + let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name); + write_to_file(content, &full_path); + + let file_io = create_local_file_io(); + let input_file = file_io.new_input(&full_path).unwrap(); + + assert!(input_file.exists().await.unwrap()); + // Remove heading slash + assert_eq!(&full_path, input_file.location()); + let read_content = read_from_file(full_path).await; + + assert_eq!(content, &read_content); + } + + #[tokio::test] + async fn test_delete_local_file() { + let tmp_dir = TempDir::new("test").unwrap(); + + let file_name = "a.txt"; + let content = "Iceberg loves rust."; + + let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name); + write_to_file(content, &full_path); + + let file_io = create_local_file_io(); + assert!(file_io.is_exist(&full_path).await.unwrap()); + file_io.delete(&full_path).await.unwrap(); + assert!(!file_io.is_exist(&full_path).await.unwrap()); + } + + #[tokio::test] + async fn test_delete_non_exist_file() { + let tmp_dir = TempDir::new("test").unwrap(); + + let file_name = "a.txt"; + let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name); + + let file_io = create_local_file_io(); + assert!(!file_io.is_exist(&full_path).await.unwrap()); + assert!(file_io.delete(&full_path).await.is_ok()); + } + + #[tokio::test] + async fn test_local_output_file() { + let tmp_dir = TempDir::new("test").unwrap(); + + let file_name = "a.txt"; + let content = "Iceberg loves rust."; + + let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name); + + let file_io = create_local_file_io(); + let output_file = file_io.new_output(&full_path).unwrap(); + + assert!(!output_file.exists().await.unwrap()); + { + let mut writer = output_file.writer().await.unwrap(); + writer.write_all(content.as_bytes()).await.unwrap(); + writer.close().await.unwrap(); + } + + assert_eq!(&full_path, output_file.location()); + + let read_content = read_from_file(full_path).await; + + assert_eq!(content, &read_content); + } +} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 5ef9ad300..93413d75b 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -28,4 +28,5 @@ pub use error::ErrorKind; pub use error::Result; mod avro; +pub mod io; pub mod spec;