diff --git a/Cargo.toml b/Cargo.toml index 82f98103e..5e2b89730 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ apache-avro = "0.17" array-init = "2" arrow-arith = { version = "53" } arrow-array = { version = "53" } +arrow-cast = { version = "53" } arrow-ord = { version = "53" } arrow-schema = { version = "53" } arrow-select = { version = "53" } diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 4d0160949..1307cc6f3 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -46,6 +46,7 @@ apache-avro = { workspace = true } array-init = { workspace = true } arrow-arith = { workspace = true } arrow-array = { workspace = true } +arrow-cast = { workspace = true } arrow-ord = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 2076a958f..31a892fa8 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -20,4 +20,6 @@ mod schema; pub use schema::*; mod reader; +pub(crate) mod record_batch_transformer; + pub use reader::*; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index ed422be99..6ab635a2d 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -38,6 +38,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI use parquet::file::metadata::ParquetMetaData; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; +use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::error::Result; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; @@ -209,6 +210,12 @@ impl ArrowReader { )?; record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask); + // RecordBatchTransformer performs any required transformations on the RecordBatches + // that come back from the file, such as type promotion, default column insertion + // and column re-ordering + let mut record_batch_transformer = + RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids()); + if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); } @@ -261,8 +268,10 @@ impl ArrowReader { // Build the batch stream and send all the RecordBatches that it generates // to the requester. let mut record_batch_stream = record_batch_stream_builder.build()?; + while let Some(batch) = record_batch_stream.try_next().await? { - tx.send(Ok(batch)).await? + tx.send(record_batch_transformer.process_record_batch(batch)) + .await? } Ok(()) diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs new file mode 100644 index 000000000..01ce9f0a8 --- /dev/null +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -0,0 +1,622 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::{ + Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, + Int32Array, Int64Array, NullArray, RecordBatch, StringArray, +}; +use arrow_cast::cast; +use arrow_schema::{ + DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, +}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::arrow::schema_to_arrow_schema; +use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema}; +use crate::{Error, ErrorKind, Result}; + +/// Indicates how a particular column in a processed RecordBatch should +/// be sourced. +#[derive(Debug)] +pub(crate) enum ColumnSource { + // signifies that a column should be passed through unmodified + // from the file's RecordBatch + PassThrough { + source_index: usize, + }, + + // signifies that a column from the file's RecordBatch has undergone + // type promotion so the source column with the given index needs + // to be promoted to the specified type + Promote { + target_type: DataType, + source_index: usize, + }, + + // Signifies that a new column has been inserted before the column + // with index `index`. (we choose "before" rather than "after" so + // that we can use usize; if we insert after, then we need to + // be able to store -1 here to signify that a new + // column is to be added at the front of the column list). + // If multiple columns need to be inserted at a given + // location, they should all be given the same index, as the index + // here refers to the original RecordBatch, not the interim state after + // a preceding operation. + Add { + target_type: DataType, + value: Option, + }, + // The iceberg spec refers to other permissible schema evolution actions + // (see https://iceberg.apache.org/spec/#schema-evolution): + // renaming fields, deleting fields and reordering fields. + // Renames only affect the schema of the RecordBatch rather than the + // columns themselves, so a single updated cached schema can + // be re-used and no per-column actions are required. + // Deletion and Reorder can be achieved without needing this + // post-processing step by using the projection mask. +} + +#[derive(Debug)] +enum BatchTransform { + // Indicates that no changes need to be performed to the RecordBatches + // coming in from the stream and that they can be passed through + // unmodified + PassThrough, + + Modify { + // Every transformed RecordBatch will have the same schema. We create the + // target just once and cache it here. Helpfully, Arc is needed in + // the constructor for RecordBatch, so we don't need an expensive copy + // each time we build a new RecordBatch + target_schema: Arc, + + // Indicates how each column in the target schema is derived. + operations: Vec, + }, + + // Sometimes only the schema will need modifying, for example when + // the column names have changed vs the file, but not the column types. + // we can avoid a heap allocation per RecordBach in this case by retaining + // the existing column Vec. + ModifySchema { + target_schema: Arc, + }, +} + +#[derive(Debug)] +enum SchemaComparison { + Equivalent, + NameChangesOnly, + Different, +} + +#[derive(Debug)] +pub(crate) struct RecordBatchTransformer { + snapshot_schema: Arc, + projected_iceberg_field_ids: Vec, + + // BatchTransform gets lazily constructed based on the schema of + // the first RecordBatch we receive from the file + batch_transform: Option, +} + +impl RecordBatchTransformer { + /// Build a RecordBatchTransformer for a given + /// Iceberg snapshot schema and list of projected field ids. + pub(crate) fn build( + snapshot_schema: Arc, + projected_iceberg_field_ids: &[i32], + ) -> Self { + let projected_iceberg_field_ids = if projected_iceberg_field_ids.is_empty() { + // If the list of field ids is empty, this indicates that we + // need to select all fields. + // Project all fields in table schema order + snapshot_schema + .as_struct() + .fields() + .iter() + .map(|field| field.id) + .collect() + } else { + projected_iceberg_field_ids.to_vec() + }; + + Self { + snapshot_schema, + projected_iceberg_field_ids, + batch_transform: None, + } + } + + pub(crate) fn process_record_batch( + &mut self, + record_batch: RecordBatch, + ) -> Result { + Ok(match &self.batch_transform { + Some(BatchTransform::PassThrough) => record_batch, + Some(BatchTransform::Modify { + ref target_schema, + ref operations, + }) => RecordBatch::try_new( + target_schema.clone(), + self.transform_columns(record_batch.columns(), operations)?, + )?, + Some(BatchTransform::ModifySchema { target_schema }) => { + record_batch.with_schema(target_schema.clone())? + } + None => { + self.batch_transform = Some(Self::generate_batch_transform( + record_batch.schema_ref(), + self.snapshot_schema.as_ref(), + &self.projected_iceberg_field_ids, + )?); + + self.process_record_batch(record_batch)? + } + }) + } + + // Compare the schema of the incoming RecordBatches to the schema of + // the Iceberg snapshot to determine what, if any, transformation + // needs to be applied. If the schemas match, we return BatchTransform::PassThrough + // to indicate that no changes need to be made. Otherwise, we return a + // BatchTransform::Modify containing the target RecordBatch schema and + // the list of `ColumnSource`s that indicate how to source each column in + // the resulting RecordBatches. + fn generate_batch_transform( + source_schema: &ArrowSchemaRef, + snapshot_schema: &IcebergSchema, + projected_iceberg_field_ids: &[i32], + ) -> Result { + let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); + let field_id_to_mapped_schema_map = + Self::build_field_id_to_arrow_schema_map(&mapped_unprojected_arrow_schema)?; + + // Create a new arrow schema by selecting fields from mapped_unprojected, + // in the order of the field ids in projected_iceberg_field_ids + let fields: Result> = projected_iceberg_field_ids + .iter() + .map(|field_id| { + Ok(field_id_to_mapped_schema_map + .get(field_id) + .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? + .0 + .clone()) + }) + .collect(); + + let target_schema = Arc::new(ArrowSchema::new(fields?)); + + match Self::compare_schemas(source_schema, &target_schema) { + SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough), + SchemaComparison::NameChangesOnly => Ok(BatchTransform::ModifySchema { target_schema }), + SchemaComparison::Different => Ok(BatchTransform::Modify { + operations: Self::generate_transform_operations( + source_schema, + snapshot_schema, + projected_iceberg_field_ids, + field_id_to_mapped_schema_map, + )?, + target_schema, + }), + } + } + + /// Compares the source and target schemas + /// Determines if they have changed in any meaningful way: + /// * If they have different numbers of fields, then we need to modify + /// the incoming RecordBatch schema AND columns + /// * If they have the same number of fields, but some of them differ in + /// either data type or nullability, then we need to modify the + /// incoming RecordBatch schema AND columns + /// * If the schemas differ only in the column names, then we need + /// to modify the RecordBatch schema BUT we can keep the + /// original column data unmodified + /// * If the schemas are identical (or differ only in inconsequential + /// ways) then we can pass through the original RecordBatch unmodified + fn compare_schemas( + source_schema: &ArrowSchemaRef, + target_schema: &ArrowSchemaRef, + ) -> SchemaComparison { + if source_schema.fields().len() != target_schema.fields().len() { + return SchemaComparison::Different; + } + + let mut names_changed = false; + + for (source_field, target_field) in source_schema + .fields() + .iter() + .zip(target_schema.fields().iter()) + { + if source_field.data_type() != target_field.data_type() + || source_field.is_nullable() != target_field.is_nullable() + { + return SchemaComparison::Different; + } + + if source_field.name() != target_field.name() { + names_changed = true; + } + } + + if names_changed { + SchemaComparison::NameChangesOnly + } else { + SchemaComparison::Equivalent + } + } + + fn generate_transform_operations( + source_schema: &ArrowSchemaRef, + snapshot_schema: &IcebergSchema, + projected_iceberg_field_ids: &[i32], + field_id_to_mapped_schema_map: HashMap, + ) -> Result> { + let field_id_to_source_schema_map = + Self::build_field_id_to_arrow_schema_map(source_schema)?; + + projected_iceberg_field_ids.iter().map(|field_id|{ + let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or( + Error::new(ErrorKind::Unexpected, "could not find field in schema") + )?; + let target_type = target_field.data_type(); + + Ok(if let Some((source_field, source_index)) = field_id_to_source_schema_map.get(field_id) { + // column present in source + + if source_field.data_type().equals_datatype(target_type) { + // no promotion required + ColumnSource::PassThrough { + source_index: *source_index + } + } else { + // promotion required + ColumnSource::Promote { + target_type: target_type.clone(), + source_index: *source_index, + } + } + } else { + // column must be added + let iceberg_field = snapshot_schema.field_by_id(*field_id).ok_or( + Error::new(ErrorKind::Unexpected, "Field not found in snapshot schema") + )?; + + let default_value = if let Some(ref iceberg_default_value) = + &iceberg_field.initial_default + { + let Literal::Primitive(primitive_literal) = iceberg_default_value else { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Default value for column must be primitive type, but encountered {:?}", iceberg_default_value) + )); + }; + Some(primitive_literal.clone()) + } else { + None + }; + + ColumnSource::Add { + value: default_value, + target_type: target_type.clone(), + } + }) + }).collect() + } + + fn build_field_id_to_arrow_schema_map( + source_schema: &SchemaRef, + ) -> Result> { + let mut field_id_to_source_schema = HashMap::new(); + for (source_field_idx, source_field) in source_schema.fields.iter().enumerate() { + let this_field_id = source_field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "field ID not present in parquet metadata", + ) + })? + .parse() + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("field id not parseable as an i32: {}", e), + ) + })?; + + field_id_to_source_schema + .insert(this_field_id, (source_field.clone(), source_field_idx)); + } + + Ok(field_id_to_source_schema) + } + + fn transform_columns( + &self, + columns: &[Arc], + operations: &[ColumnSource], + ) -> Result>> { + if columns.is_empty() { + return Ok(columns.to_vec()); + } + let num_rows = columns[0].len(); + + operations + .iter() + .map(|op| { + Ok(match op { + ColumnSource::PassThrough { source_index } => columns[*source_index].clone(), + + ColumnSource::Promote { + target_type, + source_index, + } => cast(&*columns[*source_index], target_type)?, + + ColumnSource::Add { target_type, value } => { + Self::create_column(target_type, value, num_rows)? + } + }) + }) + .collect() + } + + fn create_column( + target_type: &DataType, + prim_lit: &Option, + num_rows: usize, + ) -> Result { + Ok(match (target_type, prim_lit) { + (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => { + Arc::new(BooleanArray::from(vec![*value; num_rows])) + } + (DataType::Boolean, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(BooleanArray::from(vals)) + } + (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => { + Arc::new(Int32Array::from(vec![*value; num_rows])) + } + (DataType::Int32, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Int32Array::from(vals)) + } + (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => { + Arc::new(Int64Array::from(vec![*value; num_rows])) + } + (DataType::Int64, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Int64Array::from(vals)) + } + (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => { + Arc::new(Float32Array::from(vec![value.0; num_rows])) + } + (DataType::Float32, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Float32Array::from(vals)) + } + (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => { + Arc::new(Float64Array::from(vec![value.0; num_rows])) + } + (DataType::Float64, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Float64Array::from(vals)) + } + (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => { + Arc::new(StringArray::from(vec![value.clone(); num_rows])) + } + (DataType::Utf8, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(StringArray::from(vals)) + } + (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => { + Arc::new(BinaryArray::from_vec(vec![value; num_rows])) + } + (DataType::Binary, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(BinaryArray::from_opt_vec(vals)) + } + (DataType::Null, _) => Arc::new(NullArray::new(num_rows)), + (dt, _) => { + return Err(Error::new( + ErrorKind::Unexpected, + format!("unexpected target column type {}", dt), + )) + } + }) + } +} + +#[cfg(test)] +mod test { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{ + Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, + }; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + + use crate::arrow::record_batch_transformer::RecordBatchTransformer; + use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Type}; + + #[test] + fn build_field_id_to_source_schema_map_works() { + let arrow_schema = arrow_schema_already_same_as_target(); + + let result = + RecordBatchTransformer::build_field_id_to_arrow_schema_map(&arrow_schema).unwrap(); + + let expected = HashMap::from_iter([ + (10, (arrow_schema.fields()[0].clone(), 0)), + (11, (arrow_schema.fields()[1].clone(), 1)), + (12, (arrow_schema.fields()[2].clone(), 2)), + (14, (arrow_schema.fields()[3].clone(), 3)), + (15, (arrow_schema.fields()[4].clone(), 4)), + ]); + + assert!(result.eq(&expected)); + } + + #[test] + fn processor_returns_properly_shaped_record_batch_when_no_schema_migration_required() { + let snapshot_schema = Arc::new(iceberg_table_schema()); + let projected_iceberg_field_ids = [13, 14]; + + let mut inst = RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids); + + let result = inst + .process_record_batch(source_record_batch_no_migration_required()) + .unwrap(); + + let expected = source_record_batch_no_migration_required(); + + assert_eq!(result, expected); + } + + #[test] + fn processor_returns_properly_shaped_record_batch_when_schema_migration_required() { + let snapshot_schema = Arc::new(iceberg_table_schema()); + let projected_iceberg_field_ids = [10, 11, 12, 14, 15]; // a, b, c, e, f + + let mut inst = RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids); + + let result = inst.process_record_batch(source_record_batch()).unwrap(); + + let expected = expected_record_batch_migration_required(); + + assert_eq!(result, expected); + } + + pub fn source_record_batch() -> RecordBatch { + RecordBatch::try_new( + arrow_schema_promotion_addition_and_renaming_required(), + vec![ + Arc::new(Int32Array::from(vec![Some(1001), Some(1002), Some(1003)])), // b + Arc::new(Float32Array::from(vec![ + Some(12.125), + Some(23.375), + Some(34.875), + ])), // c + Arc::new(Int32Array::from(vec![Some(2001), Some(2002), Some(2003)])), // d + Arc::new(StringArray::from(vec![ + Some("Apache"), + Some("Iceberg"), + Some("Rocks"), + ])), // e + ], + ) + .unwrap() + } + + pub fn source_record_batch_no_migration_required() -> RecordBatch { + RecordBatch::try_new( + arrow_schema_no_promotion_addition_or_renaming_required(), + vec![ + Arc::new(Int32Array::from(vec![Some(2001), Some(2002), Some(2003)])), // d + Arc::new(StringArray::from(vec![ + Some("Apache"), + Some("Iceberg"), + Some("Rocks"), + ])), // e + ], + ) + .unwrap() + } + + pub fn expected_record_batch_migration_required() -> RecordBatch { + RecordBatch::try_new(arrow_schema_already_same_as_target(), vec![ + Arc::new(StringArray::from(Vec::>::from([ + None, None, None, + ]))), // a + Arc::new(Int64Array::from(vec![Some(1001), Some(1002), Some(1003)])), // b + Arc::new(Float64Array::from(vec![ + Some(12.125), + Some(23.375), + Some(34.875), + ])), // c + Arc::new(StringArray::from(vec![ + Some("Apache"), + Some("Iceberg"), + Some("Rocks"), + ])), // e (d skipped by projection) + Arc::new(StringArray::from(vec![ + Some("(╯°□°)╯"), + Some("(╯°□°)╯"), + Some("(╯°□°)╯"), + ])), // f + ]) + .unwrap() + } + + pub fn iceberg_table_schema() -> Schema { + Schema::builder() + .with_schema_id(2) + .with_fields(vec![ + NestedField::optional(10, "a", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(11, "b", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(12, "c", Type::Primitive(PrimitiveType::Double)).into(), + NestedField::required(13, "d", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(14, "e", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(15, "f", Type::Primitive(PrimitiveType::String)) + .with_initial_default(Literal::string("(╯°□°)╯")) + .into(), + ]) + .build() + .unwrap() + } + + fn arrow_schema_already_same_as_target() -> Arc { + Arc::new(ArrowSchema::new(vec![ + simple_field("a", DataType::Utf8, true, "10"), + simple_field("b", DataType::Int64, false, "11"), + simple_field("c", DataType::Float64, false, "12"), + simple_field("e", DataType::Utf8, true, "14"), + simple_field("f", DataType::Utf8, false, "15"), + ])) + } + + fn arrow_schema_promotion_addition_and_renaming_required() -> Arc { + Arc::new(ArrowSchema::new(vec![ + simple_field("b", DataType::Int32, false, "11"), + simple_field("c", DataType::Float32, false, "12"), + simple_field("d", DataType::Int32, false, "13"), + simple_field("e_old", DataType::Utf8, true, "14"), + ])) + } + + fn arrow_schema_no_promotion_addition_or_renaming_required() -> Arc { + Arc::new(ArrowSchema::new(vec![ + simple_field("d", DataType::Int32, false, "13"), + simple_field("e", DataType::Utf8, true, "14"), + ])) + } + + /// Create a simple arrow field with metadata. + fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field { + Field::new(name, ty, nullable).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + value.to_string(), + )])) + } +} diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index a32c10a22..e47cc9298 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -207,7 +207,7 @@ fn visit_schema(schema: &ArrowSchema, visitor: &mut V) -> visitor.schema(schema, results) } -/// Convert Arrow schema to ceberg schema. +/// Convert Arrow schema to Iceberg schema. pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result { let mut visitor = ArrowSchemaConverter::new(); visit_schema(schema, &mut visitor) diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 2b69b4706..3f50acac2 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -337,6 +337,12 @@ define_from_err!( "Failed to send a message to a channel" ); +define_from_err!( + arrow_schema::ArrowError, + ErrorKind::Unexpected, + "Arrow Schema Error" +); + define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed"); /// Converts a timestamp in milliseconds to `DateTime`, handling errors. diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index d6c5010d3..72cf18d4b 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -55,6 +55,7 @@ #[macro_use] extern crate derive_builder; +extern crate core; mod error; pub use error::{Error, ErrorKind, Result}; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index f5cbbcf06..ef0e5f542 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -906,6 +906,33 @@ pub struct FileScanTask { pub predicate: Option, } +impl FileScanTask { + /// Returns the data file path of this file scan task. + pub fn data_file_path(&self) -> &str { + &self.data_file_path + } + + /// Returns the project field id of this file scan task. + pub fn project_field_ids(&self) -> &[i32] { + &self.project_field_ids + } + + /// Returns the predicate of this file scan task. + pub fn predicate(&self) -> Option<&BoundPredicate> { + self.predicate.as_ref() + } + + /// Returns the schema of this file scan task as a reference + pub fn schema(&self) -> &Schema { + &self.schema + } + + /// Returns the schema of this file scan task as a SchemaRef + pub fn schema_ref(&self) -> SchemaRef { + self.schema.clone() + } +} + #[cfg(test)] mod tests { use std::collections::HashMap;