From 2e0f9dc207d8ce49740d854b50c4f850383d9a00 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 9 Aug 2022 13:48:04 +0100 Subject: [PATCH 1/2] Update to pre-release object_store --- Cargo.toml | 3 +++ .../src/datasource/file_format/parquet.rs | 19 +++++++++++++++++- .../file_format/chunked_store.rs | 18 ++++++++++++++++- datafusion/core/tests/path_partition.rs | 20 ++++++++++++++++++- 4 files changed, 57 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ab3f427e49be..9302add8ee46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,3 +34,6 @@ exclude = ["datafusion-cli"] [profile.release] codegen-units = 1 lto = true + +[patch.crates-io] +object_store = { git = "https://github.com/tustvold/arrow-rs", rev = "88a940865d163739141fda9eac10ff42a7b86481", package = "object_store" } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index dfd08352dffb..37c3a8533c70 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -575,7 +575,8 @@ mod tests { use futures::StreamExt; use object_store::local::LocalFileSystem; use object_store::path::Path; - use object_store::{GetResult, ListResult}; + use object_store::{GetResult, ListResult, MultipartId}; + use tokio::io::AsyncWrite; #[tokio::test] async fn read_merged_batches() -> Result<()> { @@ -649,6 +650,22 @@ mod tests { Err(object_store::Error::NotImplemented) } + async fn put_multipart( + &self, + _location: &Path, + ) -> object_store::Result<(MultipartId, Box)> + { + Err(object_store::Error::NotImplemented) + } + + async fn abort_multipart( + &self, + _location: &Path, + _multipart_id: &MultipartId, + ) -> object_store::Result<()> { + Err(object_store::Error::NotImplemented) + } + async fn get(&self, _location: &Path) -> object_store::Result { Err(object_store::Error::NotImplemented) } diff --git a/datafusion/core/src/physical_plan/file_format/chunked_store.rs b/datafusion/core/src/physical_plan/file_format/chunked_store.rs index 216926b06713..1a48804a2c55 100644 --- a/datafusion/core/src/physical_plan/file_format/chunked_store.rs +++ b/datafusion/core/src/physical_plan/file_format/chunked_store.rs @@ -20,11 +20,12 @@ use bytes::Bytes; use futures::stream::BoxStream; use futures::StreamExt; use object_store::path::Path; -use object_store::Result; use object_store::{GetResult, ListResult, ObjectMeta, ObjectStore}; +use object_store::{MultipartId, Result}; use std::fmt::{Debug, Display, Formatter}; use std::ops::Range; use std::sync::Arc; +use tokio::io::AsyncWrite; /// Wraps a [`ObjectStore`] and makes its get response return chunks /// @@ -53,6 +54,21 @@ impl ObjectStore for ChunkedStore { self.inner.put(location, bytes).await } + async fn put_multipart( + &self, + location: &Path, + ) -> Result<(MultipartId, Box)> { + self.inner.put_multipart(location).await + } + + async fn abort_multipart( + &self, + location: &Path, + multipart_id: &MultipartId, + ) -> Result<()> { + self.inner.abort_multipart(location, multipart_id).await + } + async fn get(&self, location: &Path) -> Result { let bytes = self.inner.get(location).await?.bytes().await?; let mut offset = 0; diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index 821d174f2d99..fca9b9a43b1c 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -40,7 +40,10 @@ use datafusion::{ use datafusion_common::ScalarValue; use futures::stream::BoxStream; use futures::{stream, StreamExt}; -use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore}; +use object_store::{ + path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, +}; +use tokio::io::AsyncWrite; #[tokio::test] async fn parquet_distinct_partition_col() -> Result<()> { @@ -516,6 +519,21 @@ impl ObjectStore for MirroringObjectStore { unimplemented!() } + async fn put_multipart( + &self, + _location: &Path, + ) -> object_store::Result<(MultipartId, Box)> { + unimplemented!() + } + + async fn abort_multipart( + &self, + _location: &Path, + _multipart_id: &MultipartId, + ) -> object_store::Result<()> { + unimplemented!() + } + async fn get(&self, location: &Path) -> object_store::Result { self.files.iter().find(|x| *x == location.as_ref()).unwrap(); let path = std::path::PathBuf::from(&self.mirrored_file); From 9bca915fb4f3302f6167165c271b6fdf6aca4f27 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 15 Aug 2022 09:10:37 +0100 Subject: [PATCH 2/2] Update to object_store 0.4 --- Cargo.toml | 3 --- datafusion/common/Cargo.toml | 2 +- datafusion/core/Cargo.toml | 2 +- .../src/physical_plan/file_format/parquet.rs | 24 +++++++++++++++++++ 4 files changed, 26 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9302add8ee46..ab3f427e49be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,3 @@ exclude = ["datafusion-cli"] [profile.release] codegen-units = 1 lto = true - -[patch.crates-io] -object_store = { git = "https://github.com/tustvold/arrow-rs", rev = "88a940865d163739141fda9eac10ff42a7b86481", package = "object_store" } diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 9bab66adab4e..994242c6832f 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -41,7 +41,7 @@ pyarrow = ["pyo3"] apache-avro = { version = "0.14", features = ["snappy"], optional = true } arrow = { version = "19.0.0", features = ["prettyprint"] } cranelift-module = { version = "0.86.1", optional = true } -object_store = { version = "0.3", optional = true } +object_store = { version = "0.4", optional = true } ordered-float = "3.0" parquet = { version = "19.0.0", features = ["arrow"], optional = true } pyo3 = { version = "0.16", optional = true } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index a76b2fc49b93..3cc01d8c65be 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -75,7 +75,7 @@ lazy_static = { version = "^1.4.0" } log = "^0.4" num-traits = { version = "0.2", optional = true } num_cpus = "1.13.0" -object_store = "0.3.0" +object_store = "0.4.0" ordered-float = "3.0" parking_lot = "0.12" parquet = { version = "19.0.0", features = ["arrow", "async"] } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 184214d6875d..b46f6f6677dc 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -369,6 +369,30 @@ impl AsyncFileReader for ParquetFileReader { .boxed() } + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> + where + Self: Send, + { + let total = ranges.iter().map(|r| r.end - r.start).sum(); + self.metrics.bytes_scanned.add(total); + + async move { + self.store + .get_ranges(&self.meta.location, &ranges) + .await + .map_err(|e| { + ParquetError::General(format!( + "AsyncChunkReader::get_byte_ranges error: {}", + e + )) + }) + } + .boxed() + } + fn get_metadata( &mut self, ) -> BoxFuture<'_, parquet::errors::Result>> {