Skip to content

Commit

Permalink
Merge branch 'main' into decimal-bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Oct 28, 2024
2 parents c2e8f6c + 2f3554b commit fe5df3f
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 71 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/bindings_python_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
- uses: actions/checkout@v4
- name: Install tools
run: |
pip install ruff --break-system-packages
pip install ruff
- name: Check format
working-directory: "bindings/python"
run: |
Expand Down Expand Up @@ -78,6 +78,6 @@ jobs:
shell: bash
run: |
set -e
pip install hatch==1.12.0 --break-system-packages
pip install hatch==1.12.0
hatch run dev:pip install dist/pyiceberg_core-*.whl --force-reinstall
hatch run dev:test
2 changes: 1 addition & 1 deletion .github/workflows/ci_typos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Check typos
uses: crate-ci/typos@v1.26.0
uses: crate-ci/typos@v1.26.8
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ mockito = "1"
murmur3 = "0.5.2"
num-bigint = "0.4.6"
once_cell = "1"
opendal = "0.50"
opendal = "0.50.1"
ordered-float = "4"
parquet = "53"
paste = "1"
Expand Down
2 changes: 1 addition & 1 deletion crates/catalog/glue/tests/glue_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ async fn test_create_table() -> Result<()> {
assert!(
catalog
.file_io()
.is_exist("s3a://warehouse/hive/metadata/")
.exists("s3a://warehouse/hive/metadata/")
.await?
);

Expand Down
2 changes: 1 addition & 1 deletion crates/catalog/hms/tests/hms_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ async fn test_create_table() -> Result<()> {
assert!(
catalog
.file_io()
.is_exist("s3a://warehouse/hive/metadata/")
.exists("s3a://warehouse/hive/metadata/")
.await?
);

Expand Down
29 changes: 11 additions & 18 deletions crates/iceberg/src/arrow/record_batch_transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;

use arrow_array::{
Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array,
Int32Array, Int64Array, NullArray, RecordBatch, StringArray,
Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, StringArray,
};
use arrow_cast::cast;
use arrow_schema::{
Expand Down Expand Up @@ -124,19 +124,7 @@ impl RecordBatchTransformer {
snapshot_schema: Arc<IcebergSchema>,
projected_iceberg_field_ids: &[i32],
) -> Self {
let projected_iceberg_field_ids = if projected_iceberg_field_ids.is_empty() {
// If the list of field ids is empty, this indicates that we
// need to select all fields.
// Project all fields in table schema order
snapshot_schema
.as_struct()
.fields()
.iter()
.map(|field| field.id)
.collect()
} else {
projected_iceberg_field_ids.to_vec()
};
let projected_iceberg_field_ids = projected_iceberg_field_ids.to_vec();

Self {
snapshot_schema,
Expand All @@ -154,10 +142,15 @@ impl RecordBatchTransformer {
Some(BatchTransform::Modify {
ref target_schema,
ref operations,
}) => RecordBatch::try_new(
target_schema.clone(),
self.transform_columns(record_batch.columns(), operations)?,
)?,
}) => {
let options =
RecordBatchOptions::default().with_row_count(Some(record_batch.num_rows()));
RecordBatch::try_new_with_options(
target_schema.clone(),
self.transform_columns(record_batch.columns(), operations)?,
&options,
)?
}
Some(BatchTransform::ModifySchema { target_schema }) => {
record_batch.with_schema(target_schema.clone())?
}
Expand Down
30 changes: 12 additions & 18 deletions crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ impl FileIO {
/// # Arguments
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub async fn is_exist(&self, path: impl AsRef<str>) -> Result<bool> {
pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
let (op, relative_path) = self.inner.create_operator(&path)?;
Ok(op.is_exist(relative_path).await?)
Ok(op.exists(relative_path).await?)
}

/// Creates input file.
Expand Down Expand Up @@ -241,10 +241,7 @@ impl InputFile {

/// Check if file exists.
pub async fn exists(&self) -> crate::Result<bool> {
Ok(self
.op
.is_exist(&self.path[self.relative_path_pos..])
.await?)
Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
}

/// Fetch and returns metadata of file.
Expand Down Expand Up @@ -323,10 +320,7 @@ impl OutputFile {

/// Checks if file exists.
pub async fn exists(&self) -> crate::Result<bool> {
Ok(self
.op
.is_exist(&self.path[self.relative_path_pos..])
.await?)
Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
}

/// Converts into [`InputFile`].
Expand Down Expand Up @@ -426,15 +420,15 @@ mod tests {
write_to_file("Iceberg loves rust.", &c_path);

let file_io = create_local_file_io();
assert!(file_io.is_exist(&a_path).await.unwrap());
assert!(file_io.exists(&a_path).await.unwrap());

file_io.remove_all(&sub_dir_path).await.unwrap();
assert!(!file_io.is_exist(&b_path).await.unwrap());
assert!(!file_io.is_exist(&c_path).await.unwrap());
assert!(file_io.is_exist(&a_path).await.unwrap());
assert!(!file_io.exists(&b_path).await.unwrap());
assert!(!file_io.exists(&c_path).await.unwrap());
assert!(file_io.exists(&a_path).await.unwrap());

file_io.delete(&a_path).await.unwrap();
assert!(!file_io.is_exist(&a_path).await.unwrap());
assert!(!file_io.exists(&a_path).await.unwrap());
}

#[tokio::test]
Expand All @@ -445,7 +439,7 @@ mod tests {
let full_path = format!("{}/{}", tmp_dir.path().to_str().unwrap(), file_name);

let file_io = create_local_file_io();
assert!(!file_io.is_exist(&full_path).await.unwrap());
assert!(!file_io.exists(&full_path).await.unwrap());
assert!(file_io.delete(&full_path).await.is_ok());
assert!(file_io.remove_all(&full_path).await.is_ok());
}
Expand Down Expand Up @@ -501,12 +495,12 @@ mod tests {
let output_file = io.new_output(&path).unwrap();
output_file.write("test".into()).await.unwrap();

assert!(io.is_exist(&path.clone()).await.unwrap());
assert!(io.exists(&path.clone()).await.unwrap());
let input_file = io.new_input(&path).unwrap();
let content = input_file.read().await.unwrap();
assert_eq!(content, Bytes::from("test"));

io.delete(&path).await.unwrap();
assert!(!io.is_exist(&path).await.unwrap());
assert!(!io.exists(&path).await.unwrap());
}
}
2 changes: 1 addition & 1 deletion crates/iceberg/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
//! Currently `FileIO` provides simple methods for file operations:
//!
//! - `delete`: Delete file.
//! - `is_exist`: Check if file exists.
//! - `exists`: Check if file exists.
//! - `new_input`: Create input file for reading.
//! - `new_output`: Create output file for writing.

Expand Down
63 changes: 46 additions & 17 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
/// Builder to create table scan.
pub struct TableScanBuilder<'a> {
table: &'a Table,
// Empty column names means to select all columns
column_names: Vec<String>,
// Defaults to none which means select all columns
column_names: Option<Vec<String>>,
snapshot_id: Option<i64>,
batch_size: Option<usize>,
case_sensitive: bool,
Expand All @@ -70,7 +70,7 @@ impl<'a> TableScanBuilder<'a> {

Self {
table,
column_names: vec![],
column_names: None,
snapshot_id: None,
batch_size: None,
case_sensitive: true,
Expand Down Expand Up @@ -106,16 +106,24 @@ impl<'a> TableScanBuilder<'a> {

/// Select all columns.
pub fn select_all(mut self) -> Self {
self.column_names.clear();
self.column_names = None;
self
}

/// Select empty columns.
pub fn select_empty(mut self) -> Self {
self.column_names = Some(vec![]);
self
}

/// Select some columns of the table.
pub fn select(mut self, column_names: impl IntoIterator<Item = impl ToString>) -> Self {
self.column_names = column_names
.into_iter()
.map(|item| item.to_string())
.collect();
self.column_names = Some(
column_names
.into_iter()
.map(|item| item.to_string())
.collect(),
);
self
}

Expand Down Expand Up @@ -205,8 +213,8 @@ impl<'a> TableScanBuilder<'a> {
let schema = snapshot.schema(self.table.metadata())?;

// Check that all column names exist in the schema.
if !self.column_names.is_empty() {
for column_name in &self.column_names {
if let Some(column_names) = self.column_names.as_ref() {
for column_name in column_names {
if schema.field_by_name(column_name).is_none() {
return Err(Error::new(
ErrorKind::DataInvalid,
Expand All @@ -220,7 +228,16 @@ impl<'a> TableScanBuilder<'a> {
}

let mut field_ids = vec![];
for column_name in &self.column_names {
let column_names = self.column_names.clone().unwrap_or_else(|| {
schema
.as_struct()
.fields()
.iter()
.map(|f| f.name.clone())
.collect()
});

for column_name in column_names.iter() {
let field_id = schema.field_id_by_name(column_name).ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
Expand Down Expand Up @@ -297,7 +314,7 @@ pub struct TableScan {
plan_context: PlanContext,
batch_size: Option<usize>,
file_io: FileIO,
column_names: Vec<String>,
column_names: Option<Vec<String>>,
/// The maximum number of manifest files that will be
/// retrieved from [`FileIO`] concurrently
concurrency_limit_manifest_files: usize,
Expand Down Expand Up @@ -409,9 +426,10 @@ impl TableScan {
}

/// Returns a reference to the column names of the table scan.
pub fn column_names(&self) -> &[String] {
&self.column_names
pub fn column_names(&self) -> Option<&[String]> {
self.column_names.as_deref()
}

/// Returns a reference to the snapshot of the table scan.
pub fn snapshot(&self) -> &SnapshotRef {
&self.plan_context.snapshot
Expand Down Expand Up @@ -1236,23 +1254,26 @@ mod tests {
let table = TableTestFixture::new().table;

let table_scan = table.scan().select(["x", "y"]).build().unwrap();
assert_eq!(vec!["x", "y"], table_scan.column_names);
assert_eq!(
Some(vec!["x".to_string(), "y".to_string()]),
table_scan.column_names
);

let table_scan = table
.scan()
.select(["x", "y"])
.select(["z"])
.build()
.unwrap();
assert_eq!(vec!["z"], table_scan.column_names);
assert_eq!(Some(vec!["z".to_string()]), table_scan.column_names);
}

#[test]
fn test_select_all() {
let table = TableTestFixture::new().table;

let table_scan = table.scan().select_all().build().unwrap();
assert!(table_scan.column_names.is_empty());
assert!(table_scan.column_names.is_none());
}

#[test]
Expand Down Expand Up @@ -1424,6 +1445,14 @@ mod tests {
let col2 = batches[0].column_by_name("z").unwrap();
let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 3);

// test empty scan
let table_scan = fixture.table.scan().select_empty().build().unwrap();
let batch_stream = table_scan.to_arrow().await.unwrap();
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

assert_eq!(batches[0].num_columns(), 0);
assert_eq!(batches[0].num_rows(), 1024);
}

#[tokio::test]
Expand Down
5 changes: 5 additions & 0 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,11 @@ impl ManifestEntry {
)
}

/// Status of this manifest entry
pub fn status(&self) -> ManifestStatus {
self.status
}

/// Content type of this manifest entry.
#[inline]
pub fn content_type(&self) -> DataContentType {
Expand Down
9 changes: 3 additions & 6 deletions crates/iceberg/tests/file_io_gcs_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,7 @@ mod tests {
#[tokio::test]
async fn gcs_exists() {
let file_io = get_file_io_gcs().await;
assert!(file_io
.is_exist(format!("{}/", get_gs_path()))
.await
.unwrap());
assert!(file_io.exists(format!("{}/", get_gs_path())).await.unwrap());
}

#[tokio::test]
Expand All @@ -108,7 +105,7 @@ mod tests {
.write(bytes::Bytes::from_static(b"iceberg-gcs!"))
.await
.expect("Write to test output file");
assert!(file_io.is_exist(gs_file).await.unwrap())
assert!(file_io.exists(gs_file).await.unwrap())
}

#[tokio::test]
Expand All @@ -120,7 +117,7 @@ mod tests {
.write(bytes::Bytes::from_static(b"iceberg!"))
.await
.expect("Write to test output file");
assert!(file_io.is_exist(&gs_file).await.unwrap());
assert!(file_io.exists(&gs_file).await.unwrap());

let input = file_io.new_input(gs_file).unwrap();
assert_eq!(input.read().await.unwrap(), Bytes::from_static(b"iceberg!"));
Expand Down
Loading

0 comments on commit fe5df3f

Please # to comment.