From 9fe476e02c96f907dade77e40557c5544ee5f96c Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Tue, 3 Sep 2024 13:44:37 +0100 Subject: [PATCH 01/10] feat: Add skeleton of RecordBatchEvolutionProcessor --- crates/iceberg/src/arrow/mod.rs | 2 + crates/iceberg/src/arrow/reader.rs | 21 +++- .../arrow/record_batch_evolution_processor.rs | 114 ++++++++++++++++++ crates/iceberg/src/error.rs | 6 + 4 files changed, 141 insertions(+), 2 deletions(-) create mode 100644 crates/iceberg/src/arrow/record_batch_evolution_processor.rs diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 2076a958f..b0e0805fe 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -20,4 +20,6 @@ mod schema; pub use schema::*; mod reader; +pub(crate) mod record_batch_evolution_processor; + pub use reader::*; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index ed422be99..9e594a9e9 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -38,6 +38,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI use parquet::file::metadata::ParquetMetaData; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; +use crate::arrow::record_batch_evolution_processor::RecordBatchEvolutionProcessor; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::error::Result; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; @@ -209,6 +210,14 @@ impl ArrowReader { )?; record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask); + // create a RecordBatchEvolutionProcessor if our task schema contains columns + // not present in the parquet file or whose types have been promoted + let record_batch_evolution_processor = RecordBatchEvolutionProcessor::build( + record_batch_stream_builder.schema(), + task.schema(), + task.project_field_ids(), + ); + if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); } @@ -261,8 +270,16 @@ impl ArrowReader { // Build the batch stream and send all the RecordBatches that it generates // to the requester. let mut record_batch_stream = record_batch_stream_builder.build()?; - while let Some(batch) = record_batch_stream.try_next().await? { - tx.send(Ok(batch)).await? + + if let Some(record_batch_evolution_processor) = record_batch_evolution_processor { + while let Some(batch) = record_batch_stream.try_next().await? { + tx.send(record_batch_evolution_processor.process_record_batch(batch)) + .await? + } + } else { + while let Some(batch) = record_batch_stream.try_next().await? { + tx.send(Ok(batch)).await? + } } Ok(()) diff --git a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs b/crates/iceberg/src/arrow/record_batch_evolution_processor.rs new file mode 100644 index 000000000..7741cbcba --- /dev/null +++ b/crates/iceberg/src/arrow/record_batch_evolution_processor.rs @@ -0,0 +1,114 @@ +use std::cell::OnceCell; +use std::sync::Arc; + +use arrow_array::{Array as ArrowArray, ArrayRef, RecordBatch}; +use arrow_schema::{DataType, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; + +use crate::spec::Schema as IcebergSchema; +use crate::Result; + +/// Represents an operation that may need to be performed +/// to transform a RecordBatch coming from a Parquet file record +/// batch stream to match a newer Iceberg schema that has evolved from +/// the one that was used to write the parquet file. +pub(crate) enum EvolutionOp { + // signifies that a particular column has undergone type promotion, + // thus the column with the given index needs to be promoted to the + // specified type + Promote { + index: usize, + target_type: DataType, + }, + + // Signifies that a new column has been inserted before the row + // with index `index`. (we choose "before" rather than "after" so + // that we can use a usize; if we insert after, then we need to + // be able to store -1 here when we want to indicate that the new + // column is to be added at the front of the list). + // If multiple columns need to be inserted at a given + // location, they should all be given the same index, as the index + // here refers to the original record batch, not the interim state after + // a preceding operation. + Add { + index: usize, + target_type: DataType, + value: Option, // A Scalar + }, + + // signifies that a column has been renamed from one schema to the next. + // this requires no change to the data within a record batch, only to its + // schema. + Rename { + index: usize, + target_name: String, + }, // The iceberg spec refers to other permissible schema evolution actions + // (see https://iceberg.apache.org/spec/#schema-evolution): + // deleting fields and reordering fields. + // However, these actions can be achieved without needing this + // slower post-processing step by using the projection mask. +} + +pub(crate) struct RecordBatchEvolutionProcessor { + operations: Vec, + + // Every transformed RecordBatch will have the same schema. We create the + // target just once and cache it here. Helpfully, Arc is needed in + // the constructor for RecordBatch, so we don't need an expensive copy + // each time. + target_schema: OnceCell>, // Caching any columns that we need to add is harder as the number of rows + // in the record batches can vary from batch to batch within the stream, + // so rather than storing cached added columns here too, we have to + // generate them on the fly. +} + +impl RecordBatchEvolutionProcessor { + pub(crate) fn build( + source_schema: &ArrowSchemaRef, + snapshot_schema: &IcebergSchema, + projected_iceberg_field_ids: &[i32], + ) -> Option { + let operations: Vec<_> = + Self::generate_operations(source_schema, snapshot_schema, projected_iceberg_field_ids); + + if operations.is_empty() { + None + } else { + Some(Self { + operations, + target_schema: OnceCell::default(), + }) + } + } + + pub(crate) fn process_record_batch(&self, record_batch: RecordBatch) -> Result { + let new_batch_schema = self + .target_schema + .get_or_init(|| self.create_target_schema()); + + Ok(RecordBatch::try_new( + new_batch_schema.clone(), + self.transform_columns(record_batch.columns()), + )?) + } + + fn generate_operations( + _source_schema: &ArrowSchemaRef, + _snapshot_schema: &IcebergSchema, + _projected_iceberg_field_ids: &[i32], + ) -> Vec { + // create the (possibly empty) list of `EvolutionOp`s that we need + // to apply to the arrays in a record batch with `source_schema` so + // that it matches the `snapshot_schema` + todo!(); + } + + fn transform_columns(&self, _columns: &[Arc]) -> Vec> { + // iterate over source_columns and self.operations, + // populating a Vec::with_capacity as we go + todo!(); + } + + fn create_target_schema(&self) -> Arc { + todo!(); + } +} diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 2b69b4706..3f50acac2 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -337,6 +337,12 @@ define_from_err!( "Failed to send a message to a channel" ); +define_from_err!( + arrow_schema::ArrowError, + ErrorKind::Unexpected, + "Arrow Schema Error" +); + define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed"); /// Converts a timestamp in milliseconds to `DateTime`, handling errors. From afc86ea5f12e50e53713516fa95f291d1f860cc1 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Wed, 4 Sep 2024 23:48:04 +0100 Subject: [PATCH 02/10] feat: Add initial implementation of RecordBatchEvolutionProcessor --- Cargo.toml | 1 + crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/arrow/reader.rs | 2 +- .../arrow/record_batch_evolution_processor.rs | 388 +++++++++++++++--- crates/iceberg/src/arrow/schema.rs | 3 +- 5 files changed, 346 insertions(+), 49 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 82f98103e..ca14dcc0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ rust-version = "1.77.1" anyhow = "1.0.72" apache-avro = "0.17" array-init = "2" +arrow = { version = "53" } arrow-arith = { version = "53" } arrow-array = { version = "53" } arrow-ord = { version = "53" } diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 4d0160949..3a4d44648 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -44,6 +44,7 @@ tokio = ["dep:tokio"] anyhow = { workspace = true } apache-avro = { workspace = true } array-init = { workspace = true } +arrow = { workspace = true } arrow-arith = { workspace = true } arrow-array = { workspace = true } arrow-ord = { workspace = true } diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 9e594a9e9..687834b62 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -216,7 +216,7 @@ impl ArrowReader { record_batch_stream_builder.schema(), task.schema(), task.project_field_ids(), - ); + )?; if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); diff --git a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs b/crates/iceberg/src/arrow/record_batch_evolution_processor.rs index 7741cbcba..e15acacf8 100644 --- a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs +++ b/crates/iceberg/src/arrow/record_batch_evolution_processor.rs @@ -1,28 +1,39 @@ -use std::cell::OnceCell; use std::sync::Arc; -use arrow_array::{Array as ArrowArray, ArrayRef, RecordBatch}; +use arrow::compute::cast; +use arrow_array::{ + Array as ArrowArray, ArrayRef, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array, + Int64Array, RecordBatch, StringArray, +}; use arrow_schema::{DataType, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; -use crate::spec::Schema as IcebergSchema; -use crate::Result; +use crate::arrow::schema_to_arrow_schema; +use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema}; +use crate::{Error, ErrorKind, Result}; /// Represents an operation that may need to be performed /// to transform a RecordBatch coming from a Parquet file record /// batch stream to match a newer Iceberg schema that has evolved from /// the one that was used to write the parquet file. -pub(crate) enum EvolutionOp { +#[derive(Debug)] +pub(crate) struct EvolutionOp { + index: usize, + action: EvolutionAction, +} + +#[derive(Debug)] +pub(crate) enum EvolutionAction { // signifies that a particular column has undergone type promotion, // thus the column with the given index needs to be promoted to the // specified type Promote { - index: usize, target_type: DataType, }, // Signifies that a new column has been inserted before the row // with index `index`. (we choose "before" rather than "after" so - // that we can use a usize; if we insert after, then we need to + // that we can use usize; if we insert after, then we need to // be able to store -1 here when we want to indicate that the new // column is to be added at the front of the list). // If multiple columns need to be inserted at a given @@ -30,24 +41,20 @@ pub(crate) enum EvolutionOp { // here refers to the original record batch, not the interim state after // a preceding operation. Add { - index: usize, target_type: DataType, - value: Option, // A Scalar + value: Option, }, - - // signifies that a column has been renamed from one schema to the next. - // this requires no change to the data within a record batch, only to its - // schema. - Rename { - index: usize, - target_name: String, - }, // The iceberg spec refers to other permissible schema evolution actions - // (see https://iceberg.apache.org/spec/#schema-evolution): - // deleting fields and reordering fields. - // However, these actions can be achieved without needing this - // slower post-processing step by using the projection mask. + // The iceberg spec refers to other permissible schema evolution actions + // (see https://iceberg.apache.org/spec/#schema-evolution): + // renaming fields, deleting fields and reordering fields. + // Renames only affect the RecordBatch schema rather than the + // columns themselves, so a single updated cached schema can + // be re-used and no per-column actions are required. + // Deletion and Reorder can be achieved without needing this + // post-processing step by using the projection mask. } +#[derive(Debug)] pub(crate) struct RecordBatchEvolutionProcessor { operations: Vec, @@ -55,10 +62,7 @@ pub(crate) struct RecordBatchEvolutionProcessor { // target just once and cache it here. Helpfully, Arc is needed in // the constructor for RecordBatch, so we don't need an expensive copy // each time. - target_schema: OnceCell>, // Caching any columns that we need to add is harder as the number of rows - // in the record batches can vary from batch to batch within the stream, - // so rather than storing cached added columns here too, we have to - // generate them on the fly. + target_schema: Arc, } impl RecordBatchEvolutionProcessor { @@ -66,49 +70,339 @@ impl RecordBatchEvolutionProcessor { source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], - ) -> Option { + ) -> Result> { let operations: Vec<_> = - Self::generate_operations(source_schema, snapshot_schema, projected_iceberg_field_ids); + Self::generate_operations(source_schema, snapshot_schema, projected_iceberg_field_ids)?; - if operations.is_empty() { + Ok(if operations.is_empty() { None } else { Some(Self { operations, - target_schema: OnceCell::default(), + target_schema: Arc::new(schema_to_arrow_schema(snapshot_schema)?), }) - } + }) } - pub(crate) fn process_record_batch(&self, record_batch: RecordBatch) -> Result { - let new_batch_schema = self - .target_schema - .get_or_init(|| self.create_target_schema()); + fn target_schema(&self) -> Arc { + self.target_schema.clone() + } + pub(crate) fn process_record_batch(&self, record_batch: RecordBatch) -> Result { Ok(RecordBatch::try_new( - new_batch_schema.clone(), - self.transform_columns(record_batch.columns()), + self.target_schema.clone(), + self.transform_columns(record_batch.columns())?, )?) } fn generate_operations( - _source_schema: &ArrowSchemaRef, - _snapshot_schema: &IcebergSchema, - _projected_iceberg_field_ids: &[i32], - ) -> Vec { + source_schema: &ArrowSchemaRef, + snapshot_schema: &IcebergSchema, + projected_iceberg_field_ids: &[i32], + ) -> Result> { // create the (possibly empty) list of `EvolutionOp`s that we need // to apply to the arrays in a record batch with `source_schema` so // that it matches the `snapshot_schema` - todo!(); + let mut ops = vec![]; + + let mapped_arrow_schema = schema_to_arrow_schema(snapshot_schema)?; + + let mut arrow_schema_index: usize = 0; + for (projected_field_idx, &field_id) in projected_iceberg_field_ids.iter().enumerate() { + let iceberg_field = snapshot_schema.field_by_id(field_id).unwrap(); + let mapped_arrow_field = mapped_arrow_schema.field(projected_field_idx); + + let (arrow_field, add_op_required) = + if arrow_schema_index < source_schema.fields().len() { + let arrow_field = source_schema.field(arrow_schema_index); + let arrow_field_id: i32 = arrow_field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .unwrap() + .parse() + .map_err(|_| { + Error::new(ErrorKind::DataInvalid, "field id not parseable as an i32") + })?; + (Some(arrow_field), arrow_field_id != field_id) + } else { + (None, true) + }; + + if add_op_required { + let default_value = + if let Some(ref iceberg_default_value) = &iceberg_field.initial_default { + let Literal::Primitive(prim_value) = iceberg_default_value else { + panic!(); + }; + Some(prim_value.clone()) + } else { + None + }; + + ops.push(EvolutionOp { + index: arrow_schema_index, + action: EvolutionAction::Add { + value: default_value, + target_type: mapped_arrow_field.data_type().clone(), + }, + }) + } else { + if !arrow_field + .unwrap() + .data_type() + .equals_datatype(mapped_arrow_field.data_type()) + { + ops.push(EvolutionOp { + index: arrow_schema_index, + action: EvolutionAction::Promote { + target_type: mapped_arrow_field.data_type().clone(), + }, + }) + } + + arrow_schema_index += 1; + } + } + + Ok(ops) + } + + fn transform_columns( + &self, + columns: &[Arc], + ) -> Result>> { + let mut result = Vec::with_capacity(columns.len() + self.operations.len()); + let num_rows = if columns.is_empty() { + 0 + } else { + columns[0].len() + }; + + let mut col_idx = 0; + let mut op_idx = 0; + while op_idx < self.operations.len() || col_idx < columns.len() { + if self.operations[op_idx].index == col_idx { + match &self.operations[op_idx].action { + EvolutionAction::Add { target_type, value } => { + result.push(Self::create_column(target_type, value, num_rows)?); + } + EvolutionAction::Promote { target_type } => { + result.push(cast(&*columns[col_idx], target_type)?); + col_idx += 1; + } + } + op_idx += 1; + } else { + result.push(columns[col_idx].clone()); + col_idx += 1; + } + } + + Ok(result) + } + + fn create_column( + target_type: &DataType, + prim_lit: &Option, + num_rows: usize, + ) -> Result { + Ok(match (target_type, prim_lit) { + (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => { + Arc::new(StringArray::from(vec![value.clone(); num_rows])) + } + (DataType::Utf8, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(StringArray::from(vals)) + } + (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => { + Arc::new(Float32Array::from(vec![value.0; num_rows])) + } + (DataType::Float32, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Float32Array::from(vals)) + } + (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => { + Arc::new(Float64Array::from(vec![value.0; num_rows])) + } + (DataType::Float64, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Float64Array::from(vals)) + } + (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => { + Arc::new(Int32Array::from(vec![*value; num_rows])) + } + (DataType::Int32, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Int32Array::from(vals)) + } + (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => { + Arc::new(Int64Array::from(vec![*value; num_rows])) + } + (DataType::Int64, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Int64Array::from(vals)) + } + _ => { + todo!(); + } + }) + } +} + +#[cfg(test)] +mod test { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{ + Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, + }; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + + use crate::arrow::record_batch_evolution_processor::RecordBatchEvolutionProcessor; + use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Type}; + + #[test] + fn build_returns_none_when_no_schema_migration_required() { + let snapshot_schema = iceberg_table_schema(); + let arrow_schema = arrow_schema_already_same_as_target(); + let projected_iceberg_field_ids = [10, 11, 12, 13, 14]; + + let inst = RecordBatchEvolutionProcessor::build( + &arrow_schema, + &snapshot_schema, + &projected_iceberg_field_ids, + ) + .unwrap(); + + assert!(inst.is_none()); + } + + #[test] + fn processor_returns_correct_arrow_schema_when_schema_migration_required() { + let snapshot_schema = iceberg_table_schema(); + let arrow_schema = arrow_schema_promotion_addition_and_renaming_required(); + let projected_iceberg_field_ids = [10, 11, 12, 13, 14]; + + let inst = RecordBatchEvolutionProcessor::build( + &arrow_schema, + &snapshot_schema, + &projected_iceberg_field_ids, + ) + .unwrap() + .unwrap(); + + let result = inst.target_schema(); + + assert_eq!(result, arrow_schema_already_same_as_target()); + } + + #[test] + fn processor_returns_properly_shaped_record_batch_when_schema_migration_required() { + let snapshot_schema = iceberg_table_schema(); + let arrow_schema = arrow_schema_promotion_addition_and_renaming_required(); + let projected_iceberg_field_ids = [10, 11, 12, 13, 14]; + + let inst = RecordBatchEvolutionProcessor::build( + &arrow_schema, + &snapshot_schema, + &projected_iceberg_field_ids, + ) + .unwrap() + .unwrap(); + + let result = inst.process_record_batch(source_record_batch()).unwrap(); + + let expected = expected_record_batch(); + + assert_eq!(result, expected); + } + + pub fn source_record_batch() -> RecordBatch { + RecordBatch::try_new( + arrow_schema_promotion_addition_and_renaming_required(), + vec![ + Arc::new(Int32Array::from(vec![Some(1001), Some(1002), Some(1003)])), + Arc::new(Float32Array::from(vec![ + Some(12.125), + Some(23.375), + Some(34.875), + ])), + Arc::new(StringArray::from(vec![ + Some("Apache"), + Some("Iceberg"), + Some("Rocks"), + ])), + ], + ) + .unwrap() + } + + pub fn expected_record_batch() -> RecordBatch { + RecordBatch::try_new(arrow_schema_already_same_as_target(), vec![ + Arc::new(StringArray::from(Vec::>::from([ + None, None, None, + ]))), + Arc::new(Int64Array::from(vec![Some(1001), Some(1002), Some(1003)])), + Arc::new(Float64Array::from(vec![ + Some(12.125), + Some(23.375), + Some(34.875), + ])), + Arc::new(StringArray::from(vec![ + Some("Apache"), + Some("Iceberg"), + Some("Rocks"), + ])), + Arc::new(StringArray::from(vec![ + Some("(╯°□°)╯"), + Some("(╯°□°)╯"), + Some("(╯°□°)╯"), + ])), + ]) + .unwrap() + } + + pub fn iceberg_table_schema() -> Schema { + Schema::builder() + .with_schema_id(2) + .with_fields(vec![ + NestedField::optional(10, "a", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(11, "b", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(12, "c", Type::Primitive(PrimitiveType::Double)).into(), + NestedField::optional(13, "d", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(14, "e", Type::Primitive(PrimitiveType::String)) + .with_initial_default(Literal::string("(╯°□°)╯")) + .into(), + ]) + .build() + .unwrap() + } + + fn arrow_schema_already_same_as_target() -> Arc { + Arc::new(ArrowSchema::new(vec![ + simple_field("a", DataType::Utf8, true, "10"), + simple_field("b", DataType::Int64, false, "11"), + simple_field("c", DataType::Float64, false, "12"), + simple_field("d", DataType::Utf8, true, "13"), + simple_field("e", DataType::Utf8, false, "14"), + ])) } - fn transform_columns(&self, _columns: &[Arc]) -> Vec> { - // iterate over source_columns and self.operations, - // populating a Vec::with_capacity as we go - todo!(); + fn arrow_schema_promotion_addition_and_renaming_required() -> Arc { + Arc::new(ArrowSchema::new(vec![ + simple_field("b", DataType::Int32, false, "11"), + simple_field("c", DataType::Float32, false, "12"), + simple_field("d_old", DataType::Utf8, true, "13"), + ])) } - fn create_target_schema(&self) -> Arc { - todo!(); + /// Create a simple arrow field with metadata. + fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field { + Field::new(name, ty, nullable).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + value.to_string(), + )])) } } diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index a32c10a22..a461ab182 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -207,7 +207,8 @@ fn visit_schema(schema: &ArrowSchema, visitor: &mut V) -> visitor.schema(schema, results) } -/// Convert Arrow schema to ceberg schema. +/// Convert Arrow schema to Iceberg schema. +// #[allow(dead_code)] pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result { let mut visitor = ArrowSchemaConverter::new(); visit_schema(schema, &mut visitor) From 7172cced8716e937712e5470824179ac3fd4c6eb Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Thu, 5 Sep 2024 07:56:55 +0100 Subject: [PATCH 03/10] feat: support more column types. Improve error handling. Add more comments --- .../arrow/record_batch_evolution_processor.rs | 116 ++++++++++++------ 1 file changed, 77 insertions(+), 39 deletions(-) diff --git a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs b/crates/iceberg/src/arrow/record_batch_evolution_processor.rs index e15acacf8..6786e8b89 100644 --- a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs +++ b/crates/iceberg/src/arrow/record_batch_evolution_processor.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use arrow::compute::cast; use arrow_array::{ - Array as ArrowArray, ArrayRef, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array, - Int64Array, RecordBatch, StringArray, + Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, + Int32Array, Int64Array, NullArray, RecordBatch, StringArray, }; use arrow_schema::{DataType, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; @@ -12,10 +12,10 @@ use crate::arrow::schema_to_arrow_schema; use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema}; use crate::{Error, ErrorKind, Result}; -/// Represents an operation that may need to be performed +/// Represents an operation that needs to be performed /// to transform a RecordBatch coming from a Parquet file record -/// batch stream to match a newer Iceberg schema that has evolved from -/// the one that was used to write the parquet file. +/// batch stream so that it conforms to an Iceberg schema that has +/// evolved from the one that was used when the file was written. #[derive(Debug)] pub(crate) struct EvolutionOp { index: usize, @@ -66,6 +66,9 @@ pub(crate) struct RecordBatchEvolutionProcessor { } impl RecordBatchEvolutionProcessor { + /// Fallibly try to build a RecordBatchEvolutionProcessor for a given parquet file schema + /// and Iceberg snapshot schema. Returns Ok(None) if the processor would not be required + /// due to the file schema already matching the snapshot schema pub(crate) fn build( source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, @@ -90,26 +93,31 @@ impl RecordBatchEvolutionProcessor { pub(crate) fn process_record_batch(&self, record_batch: RecordBatch) -> Result { Ok(RecordBatch::try_new( - self.target_schema.clone(), + self.target_schema(), self.transform_columns(record_batch.columns())?, )?) } + // create the (possibly empty) list of `EvolutionOp`s that we need + // to apply to the arrays in a record batch with `source_schema` so + // that it matches the `snapshot_schema` fn generate_operations( source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], ) -> Result> { - // create the (possibly empty) list of `EvolutionOp`s that we need - // to apply to the arrays in a record batch with `source_schema` so - // that it matches the `snapshot_schema` let mut ops = vec![]; let mapped_arrow_schema = schema_to_arrow_schema(snapshot_schema)?; let mut arrow_schema_index: usize = 0; for (projected_field_idx, &field_id) in projected_iceberg_field_ids.iter().enumerate() { - let iceberg_field = snapshot_schema.field_by_id(field_id).unwrap(); + let iceberg_field = snapshot_schema.field_by_id(field_id).ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "projected field id not found in snapshot schema", + ) + })?; let mapped_arrow_field = mapped_arrow_schema.field(projected_field_idx); let (arrow_field, add_op_required) = @@ -118,10 +126,18 @@ impl RecordBatchEvolutionProcessor { let arrow_field_id: i32 = arrow_field .metadata() .get(PARQUET_FIELD_ID_META_KEY) - .unwrap() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "field ID not present in parquet metadata", + ) + })? .parse() - .map_err(|_| { - Error::new(ErrorKind::DataInvalid, "field id not parseable as an i32") + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("field id not parseable as an i32: {}", e), + ) })?; (Some(arrow_field), arrow_field_id != field_id) } else { @@ -129,15 +145,19 @@ impl RecordBatchEvolutionProcessor { }; if add_op_required { - let default_value = - if let Some(ref iceberg_default_value) = &iceberg_field.initial_default { - let Literal::Primitive(prim_value) = iceberg_default_value else { - panic!(); - }; - Some(prim_value.clone()) - } else { - None + let default_value = if let Some(ref iceberg_default_value) = + &iceberg_field.initial_default + { + let Literal::Primitive(prim_value) = iceberg_default_value else { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Default value for column must be primitive type, but encountered {:?}", iceberg_default_value) + )); }; + Some(prim_value.clone()) + } else { + None + }; ops.push(EvolutionOp { index: arrow_schema_index, @@ -148,7 +168,7 @@ impl RecordBatchEvolutionProcessor { }) } else { if !arrow_field - .unwrap() + .unwrap() // will never fail as we only get here if we have Some(field) .data_type() .equals_datatype(mapped_arrow_field.data_type()) { @@ -207,12 +227,26 @@ impl RecordBatchEvolutionProcessor { num_rows: usize, ) -> Result { Ok(match (target_type, prim_lit) { - (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => { - Arc::new(StringArray::from(vec![value.clone(); num_rows])) + (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => { + Arc::new(BooleanArray::from(vec![*value; num_rows])) } - (DataType::Utf8, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(StringArray::from(vals)) + (DataType::Boolean, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(BooleanArray::from(vals)) + } + (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => { + Arc::new(Int32Array::from(vec![*value; num_rows])) + } + (DataType::Int32, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Int32Array::from(vals)) + } + (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => { + Arc::new(Int64Array::from(vec![*value; num_rows])) + } + (DataType::Int64, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Int64Array::from(vals)) } (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => { Arc::new(Float32Array::from(vec![value.0; num_rows])) @@ -228,22 +262,26 @@ impl RecordBatchEvolutionProcessor { let vals: Vec> = vec![None; num_rows]; Arc::new(Float64Array::from(vals)) } - (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => { - Arc::new(Int32Array::from(vec![*value; num_rows])) + (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => { + Arc::new(StringArray::from(vec![value.clone(); num_rows])) } - (DataType::Int32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Int32Array::from(vals)) + (DataType::Utf8, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(StringArray::from(vals)) } - (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => { - Arc::new(Int64Array::from(vec![*value; num_rows])) + (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => { + Arc::new(BinaryArray::from_vec(vec![value; num_rows])) } - (DataType::Int64, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Int64Array::from(vals)) + (DataType::Binary, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(BinaryArray::from_opt_vec(vals)) } - _ => { - todo!(); + (DataType::Null, _) => Arc::new(NullArray::new(num_rows)), + (dt, _) => { + return Err(Error::new( + ErrorKind::Unexpected, + format!("unexpected target column type {}", dt), + )) } }) } From 3d5d8c31f77fa7f3131acd5fbe129224f5e59339 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Fri, 6 Sep 2024 07:23:54 +0100 Subject: [PATCH 04/10] feat(wip): adress issues with reordered / skipped fields --- .../arrow/record_batch_evolution_processor.rs | 63 ++++++++++++++----- 1 file changed, 49 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs b/crates/iceberg/src/arrow/record_batch_evolution_processor.rs index 6786e8b89..9b3d20098 100644 --- a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs +++ b/crates/iceberg/src/arrow/record_batch_evolution_processor.rs @@ -5,7 +5,7 @@ use arrow_array::{ Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, StringArray, }; -use arrow_schema::{DataType, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::schema_to_arrow_schema; @@ -108,22 +108,27 @@ impl RecordBatchEvolutionProcessor { ) -> Result> { let mut ops = vec![]; - let mapped_arrow_schema = schema_to_arrow_schema(snapshot_schema)?; + let mapped_unprojected_arrow_schema = schema_to_arrow_schema(snapshot_schema)?; + // need to create a new arrow schema here by selecting fields from mapped_unprojected, + // in the order of the field ids in projected_iceberg_field_ids - let mut arrow_schema_index: usize = 0; - for (projected_field_idx, &field_id) in projected_iceberg_field_ids.iter().enumerate() { - let iceberg_field = snapshot_schema.field_by_id(field_id).ok_or_else(|| { + // right now the below is incorrect if projected_iceberg_field_ids skips any iceberg fields + // or re-orders any + + for &projected_field_id in projected_iceberg_field_ids { + let iceberg_field = snapshot_schema.field_by_id(projected_field_id).ok_or_else(|| { Error::new( ErrorKind::Unexpected, "projected field id not found in snapshot schema", ) })?; - let mapped_arrow_field = mapped_arrow_schema.field(projected_field_idx); + let (mapped_arrow_field, _) = Self::get_arrow_field_with_field_id(&mapped_arrow_schema, projected_field_id)?; + let (orig_arrow_field, orig_arrow_field_idx) = Self::get_arrow_field_with_field_id(&source_schema, projected_field_id)?; let (arrow_field, add_op_required) = - if arrow_schema_index < source_schema.fields().len() { - let arrow_field = source_schema.field(arrow_schema_index); - let arrow_field_id: i32 = arrow_field + if source_schema_idx < source_schema.fields().len() { + let orig_arrow_field = source_schema.field(source_schema_idx); + let arrow_field_id: i32 = orig_arrow_field .metadata() .get(PARQUET_FIELD_ID_META_KEY) .ok_or_else(|| { @@ -139,7 +144,7 @@ impl RecordBatchEvolutionProcessor { format!("field id not parseable as an i32: {}", e), ) })?; - (Some(arrow_field), arrow_field_id != field_id) + (Some(orig_arrow_field), arrow_field_id != projected_field_id) } else { (None, true) }; @@ -160,7 +165,7 @@ impl RecordBatchEvolutionProcessor { }; ops.push(EvolutionOp { - index: arrow_schema_index, + index: source_schema_idx, action: EvolutionAction::Add { value: default_value, target_type: mapped_arrow_field.data_type().clone(), @@ -173,20 +178,50 @@ impl RecordBatchEvolutionProcessor { .equals_datatype(mapped_arrow_field.data_type()) { ops.push(EvolutionOp { - index: arrow_schema_index, + index: source_schema_idx, action: EvolutionAction::Promote { target_type: mapped_arrow_field.data_type().clone(), }, }) } - arrow_schema_index += 1; + source_schema_idx += 1; } } Ok(ops) } + fn get_arrow_field_with_field_id(arrow_schema: &ArrowSchema, field_id: i32) -> Result<(FieldRef, usize)> { + for (field, idx) in arrow_schema.fields().enumerate().iter() { + let this_field_id: i32 = field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "field ID not present in parquet metadata", + ) + })? + .parse() + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("field id not parseable as an i32: {}", e), + ) + })?; + + if this_field_id == field_id { + return Ok((field.clone(), idx)) + } + } + + Err(Error::new( + ErrorKind::Unexpected, + format!("field with id {} not found in parquet schema", field_id) + )) + } + fn transform_columns( &self, columns: &[Arc], @@ -201,7 +236,7 @@ impl RecordBatchEvolutionProcessor { let mut col_idx = 0; let mut op_idx = 0; while op_idx < self.operations.len() || col_idx < columns.len() { - if self.operations[op_idx].index == col_idx { + if op_idx < self.operations.len() && self.operations[op_idx].index == col_idx { match &self.operations[op_idx].action { EvolutionAction::Add { target_type, value } => { result.push(Self::create_column(target_type, value, num_rows)?); From 3421fe1d7279dc6a59dad39a906c3ad7413ac196 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Mon, 9 Sep 2024 08:34:29 +0100 Subject: [PATCH 05/10] feat: RecordBatchEvolutionProcessor handles skipped fields in projection --- crates/iceberg/src/arrow/reader.rs | 19 +- .../arrow/record_batch_evolution_processor.rs | 426 ++++++++++-------- crates/iceberg/src/scan.rs | 27 ++ 3 files changed, 274 insertions(+), 198 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 687834b62..0bae4d808 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -212,11 +212,8 @@ impl ArrowReader { // create a RecordBatchEvolutionProcessor if our task schema contains columns // not present in the parquet file or whose types have been promoted - let record_batch_evolution_processor = RecordBatchEvolutionProcessor::build( - record_batch_stream_builder.schema(), - task.schema(), - task.project_field_ids(), - )?; + let mut record_batch_evolution_processor = + RecordBatchEvolutionProcessor::build(task.schema_ref(), task.project_field_ids()); if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); @@ -271,15 +268,9 @@ impl ArrowReader { // to the requester. let mut record_batch_stream = record_batch_stream_builder.build()?; - if let Some(record_batch_evolution_processor) = record_batch_evolution_processor { - while let Some(batch) = record_batch_stream.try_next().await? { - tx.send(record_batch_evolution_processor.process_record_batch(batch)) - .await? - } - } else { - while let Some(batch) = record_batch_stream.try_next().await? { - tx.send(Ok(batch)).await? - } + while let Some(batch) = record_batch_stream.try_next().await? { + tx.send(record_batch_evolution_processor.process_record_batch(batch)) + .await? } Ok(()) diff --git a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs b/crates/iceberg/src/arrow/record_batch_evolution_processor.rs index 9b3d20098..9e4106c14 100644 --- a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs +++ b/crates/iceberg/src/arrow/record_batch_evolution_processor.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use arrow::compute::cast; @@ -5,7 +6,9 @@ use arrow_array::{ Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, StringArray, }; -use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use arrow_schema::{ + DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, +}; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::schema_to_arrow_schema; @@ -16,19 +19,19 @@ use crate::{Error, ErrorKind, Result}; /// to transform a RecordBatch coming from a Parquet file record /// batch stream so that it conforms to an Iceberg schema that has /// evolved from the one that was used when the file was written. -#[derive(Debug)] -pub(crate) struct EvolutionOp { - index: usize, - action: EvolutionAction, -} - #[derive(Debug)] pub(crate) enum EvolutionAction { - // signifies that a particular column has undergone type promotion, - // thus the column with the given index needs to be promoted to the + // signifies that a column should be passed through unmodified + PassThrough { + source_index: usize, + }, + + // signifies particular column has undergone type promotion, and so + // the source column with the given index needs to be promoted to the // specified type Promote { target_type: DataType, + source_index: usize, }, // Signifies that a new column has been inserted before the row @@ -55,14 +58,22 @@ pub(crate) enum EvolutionAction { } #[derive(Debug)] -pub(crate) struct RecordBatchEvolutionProcessor { - operations: Vec, - +struct SchemaAndOps { // Every transformed RecordBatch will have the same schema. We create the // target just once and cache it here. Helpfully, Arc is needed in // the constructor for RecordBatch, so we don't need an expensive copy // each time. - target_schema: Arc, + pub target_schema: Arc, + + // Indicates how each column in the target schema is derived. + pub operations: Vec, +} + +#[derive(Debug)] +pub(crate) struct RecordBatchEvolutionProcessor { + snapshot_schema: Arc, + projected_iceberg_field_ids: Vec, + schema_and_ops: Option, } impl RecordBatchEvolutionProcessor { @@ -70,30 +81,68 @@ impl RecordBatchEvolutionProcessor { /// and Iceberg snapshot schema. Returns Ok(None) if the processor would not be required /// due to the file schema already matching the snapshot schema pub(crate) fn build( - source_schema: &ArrowSchemaRef, - snapshot_schema: &IcebergSchema, + // source_schema: &ArrowSchemaRef, + snapshot_schema: Arc, projected_iceberg_field_ids: &[i32], - ) -> Result> { - let operations: Vec<_> = - Self::generate_operations(source_schema, snapshot_schema, projected_iceberg_field_ids)?; - - Ok(if operations.is_empty() { - None + ) -> Self { + let projected_iceberg_field_ids = if projected_iceberg_field_ids.is_empty() { + // project all fields in table schema order + snapshot_schema + .as_struct() + .fields() + .iter() + .map(|field| field.id) + .collect() } else { - Some(Self { - operations, - target_schema: Arc::new(schema_to_arrow_schema(snapshot_schema)?), - }) - }) - } + projected_iceberg_field_ids.to_vec() + }; - fn target_schema(&self) -> Arc { - self.target_schema.clone() + Self { + snapshot_schema, + projected_iceberg_field_ids, + schema_and_ops: None, + } + + // let (operations, target_schema) = Self::generate_operations_and_schema( + // source_schema, + // snapshot_schema, + // projected_iceberg_field_ids, + // )?; + // + // Ok(if target_schema.as_ref() == source_schema.as_ref() { + // None + // } else { + // Some(Self { + // operations, + // target_schema, + // }) + // }) } - pub(crate) fn process_record_batch(&self, record_batch: RecordBatch) -> Result { + pub(crate) fn process_record_batch( + &mut self, + record_batch: RecordBatch, + ) -> Result { + if self.schema_and_ops.is_none() { + self.schema_and_ops = Some(Self::generate_operations_and_schema( + record_batch.schema_ref(), + self.snapshot_schema.as_ref(), + &self.projected_iceberg_field_ids, + )?); + } + + let Some(SchemaAndOps { + ref target_schema, .. + }) = self.schema_and_ops + else { + return Err(Error::new( + ErrorKind::Unexpected, + "schema_and_ops always created at this point", + )); + }; + Ok(RecordBatch::try_new( - self.target_schema(), + target_schema.clone(), self.transform_columns(record_batch.columns())?, )?) } @@ -101,100 +150,91 @@ impl RecordBatchEvolutionProcessor { // create the (possibly empty) list of `EvolutionOp`s that we need // to apply to the arrays in a record batch with `source_schema` so // that it matches the `snapshot_schema` - fn generate_operations( + fn generate_operations_and_schema( source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], - ) -> Result> { - let mut ops = vec![]; - - let mapped_unprojected_arrow_schema = schema_to_arrow_schema(snapshot_schema)?; - // need to create a new arrow schema here by selecting fields from mapped_unprojected, + ) -> Result { + let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); + let field_id_to_source_schema_map = + Self::build_field_id_to_arrow_schema_map(source_schema)?; + let field_id_to_mapped_schema_map = + Self::build_field_id_to_arrow_schema_map(&mapped_unprojected_arrow_schema)?; + + // Create a new arrow schema by selecting fields from mapped_unprojected, // in the order of the field ids in projected_iceberg_field_ids - - // right now the below is incorrect if projected_iceberg_field_ids skips any iceberg fields - // or re-orders any - - for &projected_field_id in projected_iceberg_field_ids { - let iceberg_field = snapshot_schema.field_by_id(projected_field_id).ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "projected field id not found in snapshot schema", - ) - })?; - let (mapped_arrow_field, _) = Self::get_arrow_field_with_field_id(&mapped_arrow_schema, projected_field_id)?; - let (orig_arrow_field, orig_arrow_field_idx) = Self::get_arrow_field_with_field_id(&source_schema, projected_field_id)?; - - let (arrow_field, add_op_required) = - if source_schema_idx < source_schema.fields().len() { - let orig_arrow_field = source_schema.field(source_schema_idx); - let arrow_field_id: i32 = orig_arrow_field - .metadata() - .get(PARQUET_FIELD_ID_META_KEY) - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - "field ID not present in parquet metadata", - ) - })? - .parse() - .map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - format!("field id not parseable as an i32: {}", e), - ) - })?; - (Some(orig_arrow_field), arrow_field_id != projected_field_id) + let fields: Result> = projected_iceberg_field_ids + .iter() + .map(|field_id| { + Ok(field_id_to_mapped_schema_map + .get(field_id) + .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? + .0 + .clone()) + }) + .collect(); + let target_schema = ArrowSchema::new(fields?); + + let operations: Result> = projected_iceberg_field_ids.iter().map(|field_id|{ + let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or( + Error::new(ErrorKind::Unexpected, "could not find field in schema") + )?; + let target_type = target_field.data_type(); + + Ok(if let Some((source_field, source_index)) = field_id_to_source_schema_map.get(field_id) { + // column present in source + + if source_field.data_type().equals_datatype(target_type) { + // no promotion required + EvolutionAction::PassThrough { + source_index: *source_index + } } else { - (None, true) - }; + // promotion required + EvolutionAction::Promote { + target_type: target_type.clone(), + source_index: *source_index, + } + } + } else { + // column must be added + let iceberg_field = snapshot_schema.field_by_id(*field_id).ok_or( + Error::new(ErrorKind::Unexpected, "Field not found in snapshot schema") + )?; - if add_op_required { let default_value = if let Some(ref iceberg_default_value) = &iceberg_field.initial_default { let Literal::Primitive(prim_value) = iceberg_default_value else { return Err(Error::new( - ErrorKind::Unexpected, - format!("Default value for column must be primitive type, but encountered {:?}", iceberg_default_value) - )); + ErrorKind::Unexpected, + format!("Default value for column must be primitive type, but encountered {:?}", iceberg_default_value) + )); }; Some(prim_value.clone()) } else { None }; - ops.push(EvolutionOp { - index: source_schema_idx, - action: EvolutionAction::Add { - value: default_value, - target_type: mapped_arrow_field.data_type().clone(), - }, - }) - } else { - if !arrow_field - .unwrap() // will never fail as we only get here if we have Some(field) - .data_type() - .equals_datatype(mapped_arrow_field.data_type()) - { - ops.push(EvolutionOp { - index: source_schema_idx, - action: EvolutionAction::Promote { - target_type: mapped_arrow_field.data_type().clone(), - }, - }) + EvolutionAction::Add { + value: default_value, + target_type: target_type.clone(), } + }) + }).collect(); - source_schema_idx += 1; - } - } - - Ok(ops) + Ok(SchemaAndOps { + operations: operations?, + target_schema: Arc::new(target_schema), + }) } - fn get_arrow_field_with_field_id(arrow_schema: &ArrowSchema, field_id: i32) -> Result<(FieldRef, usize)> { - for (field, idx) in arrow_schema.fields().enumerate().iter() { - let this_field_id: i32 = field + fn build_field_id_to_arrow_schema_map( + source_schema: &SchemaRef, + ) -> Result> { + let mut field_id_to_source_schema = HashMap::new(); + for (source_field_idx, source_field) in source_schema.fields.iter().enumerate() { + let this_field_id = source_field .metadata() .get(PARQUET_FIELD_ID_META_KEY) .ok_or_else(|| { @@ -211,49 +251,47 @@ impl RecordBatchEvolutionProcessor { ) })?; - if this_field_id == field_id { - return Ok((field.clone(), idx)) - } + field_id_to_source_schema + .insert(this_field_id, (source_field.clone(), source_field_idx)); } - Err(Error::new( - ErrorKind::Unexpected, - format!("field with id {} not found in parquet schema", field_id) - )) + Ok(field_id_to_source_schema) } fn transform_columns( &self, columns: &[Arc], ) -> Result>> { - let mut result = Vec::with_capacity(columns.len() + self.operations.len()); - let num_rows = if columns.is_empty() { - 0 - } else { - columns[0].len() + if columns.is_empty() { + return Ok(columns.to_vec()); + } + let num_rows = columns[0].len(); + + let Some(ref schema_and_ops) = self.schema_and_ops else { + return Err(Error::new( + ErrorKind::Unexpected, + "schema_and_ops was None, but should be present", + )); }; - let mut col_idx = 0; - let mut op_idx = 0; - while op_idx < self.operations.len() || col_idx < columns.len() { - if op_idx < self.operations.len() && self.operations[op_idx].index == col_idx { - match &self.operations[op_idx].action { + let result: Result> = schema_and_ops + .operations + .iter() + .map(|op| { + Ok(match op { + EvolutionAction::PassThrough { source_index } => columns[*source_index].clone(), + EvolutionAction::Promote { + target_type, + source_index, + } => cast(&*columns[*source_index], target_type)?, EvolutionAction::Add { target_type, value } => { - result.push(Self::create_column(target_type, value, num_rows)?); + Self::create_column(target_type, value, num_rows)? } - EvolutionAction::Promote { target_type } => { - result.push(cast(&*columns[col_idx], target_type)?); - col_idx += 1; - } - } - op_idx += 1; - } else { - result.push(columns[col_idx].clone()); - col_idx += 1; - } - } + }) + }) + .collect(); - Ok(result) + result } fn create_column( @@ -337,57 +375,52 @@ mod test { use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Type}; #[test] - fn build_returns_none_when_no_schema_migration_required() { - let snapshot_schema = iceberg_table_schema(); + fn build_field_id_to_source_schema_map_works() { let arrow_schema = arrow_schema_already_same_as_target(); - let projected_iceberg_field_ids = [10, 11, 12, 13, 14]; - let inst = RecordBatchEvolutionProcessor::build( - &arrow_schema, - &snapshot_schema, - &projected_iceberg_field_ids, - ) - .unwrap(); + let result = + RecordBatchEvolutionProcessor::build_field_id_to_arrow_schema_map(&arrow_schema) + .unwrap(); + + let expected = HashMap::from_iter([ + (10, (arrow_schema.fields()[0].clone(), 0)), + (11, (arrow_schema.fields()[1].clone(), 1)), + (12, (arrow_schema.fields()[2].clone(), 2)), + (14, (arrow_schema.fields()[3].clone(), 3)), + (15, (arrow_schema.fields()[4].clone(), 4)), + ]); - assert!(inst.is_none()); + assert!(result.eq(&expected)); } #[test] - fn processor_returns_correct_arrow_schema_when_schema_migration_required() { - let snapshot_schema = iceberg_table_schema(); - let arrow_schema = arrow_schema_promotion_addition_and_renaming_required(); - let projected_iceberg_field_ids = [10, 11, 12, 13, 14]; - - let inst = RecordBatchEvolutionProcessor::build( - &arrow_schema, - &snapshot_schema, - &projected_iceberg_field_ids, - ) - .unwrap() - .unwrap(); + fn processor_returns_properly_shaped_record_batch_when_no_schema_migration_required() { + let snapshot_schema = Arc::new(iceberg_table_schema()); + let projected_iceberg_field_ids = [13, 14]; + + let mut inst = + RecordBatchEvolutionProcessor::build(snapshot_schema, &projected_iceberg_field_ids); - let result = inst.target_schema(); + let result = inst + .process_record_batch(source_record_batch_no_migration_required()) + .unwrap(); - assert_eq!(result, arrow_schema_already_same_as_target()); + let expected = source_record_batch_no_migration_required(); + + assert_eq!(result, expected); } #[test] fn processor_returns_properly_shaped_record_batch_when_schema_migration_required() { - let snapshot_schema = iceberg_table_schema(); - let arrow_schema = arrow_schema_promotion_addition_and_renaming_required(); - let projected_iceberg_field_ids = [10, 11, 12, 13, 14]; - - let inst = RecordBatchEvolutionProcessor::build( - &arrow_schema, - &snapshot_schema, - &projected_iceberg_field_ids, - ) - .unwrap() - .unwrap(); + let snapshot_schema = Arc::new(iceberg_table_schema()); + let projected_iceberg_field_ids = [10, 11, 12, 14, 15]; // a, b, c, e, f + + let mut inst = + RecordBatchEvolutionProcessor::build(snapshot_schema, &projected_iceberg_field_ids); let result = inst.process_record_batch(source_record_batch()).unwrap(); - let expected = expected_record_batch(); + let expected = expected_record_batch_migration_required(); assert_eq!(result, expected); } @@ -396,43 +429,59 @@ mod test { RecordBatch::try_new( arrow_schema_promotion_addition_and_renaming_required(), vec![ - Arc::new(Int32Array::from(vec![Some(1001), Some(1002), Some(1003)])), + Arc::new(Int32Array::from(vec![Some(1001), Some(1002), Some(1003)])), // b Arc::new(Float32Array::from(vec![ Some(12.125), Some(23.375), Some(34.875), - ])), + ])), // c + Arc::new(Int32Array::from(vec![Some(2001), Some(2002), Some(2003)])), // d + Arc::new(StringArray::from(vec![ + Some("Apache"), + Some("Iceberg"), + Some("Rocks"), + ])), // e + ], + ) + .unwrap() + } + + pub fn source_record_batch_no_migration_required() -> RecordBatch { + RecordBatch::try_new( + arrow_schema_no_promotion_addition_or_renaming_required(), + vec![ + Arc::new(Int32Array::from(vec![Some(2001), Some(2002), Some(2003)])), // d Arc::new(StringArray::from(vec![ Some("Apache"), Some("Iceberg"), Some("Rocks"), - ])), + ])), // e ], ) .unwrap() } - pub fn expected_record_batch() -> RecordBatch { + pub fn expected_record_batch_migration_required() -> RecordBatch { RecordBatch::try_new(arrow_schema_already_same_as_target(), vec![ Arc::new(StringArray::from(Vec::>::from([ None, None, None, - ]))), - Arc::new(Int64Array::from(vec![Some(1001), Some(1002), Some(1003)])), + ]))), // a + Arc::new(Int64Array::from(vec![Some(1001), Some(1002), Some(1003)])), // b Arc::new(Float64Array::from(vec![ Some(12.125), Some(23.375), Some(34.875), - ])), + ])), // c Arc::new(StringArray::from(vec![ Some("Apache"), Some("Iceberg"), Some("Rocks"), - ])), + ])), // e (d skipped by projection) Arc::new(StringArray::from(vec![ Some("(╯°□°)╯"), Some("(╯°□°)╯"), Some("(╯°□°)╯"), - ])), + ])), // f ]) .unwrap() } @@ -444,8 +493,9 @@ mod test { NestedField::optional(10, "a", Type::Primitive(PrimitiveType::String)).into(), NestedField::required(11, "b", Type::Primitive(PrimitiveType::Long)).into(), NestedField::required(12, "c", Type::Primitive(PrimitiveType::Double)).into(), - NestedField::optional(13, "d", Type::Primitive(PrimitiveType::String)).into(), - NestedField::required(14, "e", Type::Primitive(PrimitiveType::String)) + NestedField::required(13, "d", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(14, "e", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(15, "f", Type::Primitive(PrimitiveType::String)) .with_initial_default(Literal::string("(╯°□°)╯")) .into(), ]) @@ -458,8 +508,8 @@ mod test { simple_field("a", DataType::Utf8, true, "10"), simple_field("b", DataType::Int64, false, "11"), simple_field("c", DataType::Float64, false, "12"), - simple_field("d", DataType::Utf8, true, "13"), - simple_field("e", DataType::Utf8, false, "14"), + simple_field("e", DataType::Utf8, true, "14"), + simple_field("f", DataType::Utf8, false, "15"), ])) } @@ -467,7 +517,15 @@ mod test { Arc::new(ArrowSchema::new(vec![ simple_field("b", DataType::Int32, false, "11"), simple_field("c", DataType::Float32, false, "12"), - simple_field("d_old", DataType::Utf8, true, "13"), + simple_field("d", DataType::Int32, false, "13"), + simple_field("e_old", DataType::Utf8, true, "14"), + ])) + } + + fn arrow_schema_no_promotion_addition_or_renaming_required() -> Arc { + Arc::new(ArrowSchema::new(vec![ + simple_field("d", DataType::Int32, false, "13"), + simple_field("e", DataType::Utf8, true, "14"), ])) } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index f5cbbcf06..ef0e5f542 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -906,6 +906,33 @@ pub struct FileScanTask { pub predicate: Option, } +impl FileScanTask { + /// Returns the data file path of this file scan task. + pub fn data_file_path(&self) -> &str { + &self.data_file_path + } + + /// Returns the project field id of this file scan task. + pub fn project_field_ids(&self) -> &[i32] { + &self.project_field_ids + } + + /// Returns the predicate of this file scan task. + pub fn predicate(&self) -> Option<&BoundPredicate> { + self.predicate.as_ref() + } + + /// Returns the schema of this file scan task as a reference + pub fn schema(&self) -> &Schema { + &self.schema + } + + /// Returns the schema of this file scan task as a SchemaRef + pub fn schema_ref(&self) -> SchemaRef { + self.schema.clone() + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; From 657f58bc2fd3834d3efacf464ff359d217d13cef Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Mon, 9 Sep 2024 19:10:30 +0100 Subject: [PATCH 06/10] chore: add missing license header --- .../arrow/record_batch_evolution_processor.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs b/crates/iceberg/src/arrow/record_batch_evolution_processor.rs index 9e4106c14..6a58cc68f 100644 --- a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs +++ b/crates/iceberg/src/arrow/record_batch_evolution_processor.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::collections::HashMap; use std::sync::Arc; From 81480b9e15718246c08305339f83ac5378fdbb4d Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Mon, 23 Sep 2024 07:08:02 +0100 Subject: [PATCH 07/10] chore: remove unneeded comment --- crates/iceberg/src/arrow/schema.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index a461ab182..e47cc9298 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -208,7 +208,6 @@ fn visit_schema(schema: &ArrowSchema, visitor: &mut V) -> } /// Convert Arrow schema to Iceberg schema. -// #[allow(dead_code)] pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result { let mut visitor = ArrowSchemaConverter::new(); visit_schema(schema, &mut visitor) From 0b17465c4b79478b8b86115dec5ab0e9bc6b7b45 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Mon, 23 Sep 2024 08:11:50 +0100 Subject: [PATCH 08/10] refactor: rename to RecordBatchTransformer. Improve passthrough handling --- crates/iceberg/src/arrow/mod.rs | 2 +- crates/iceberg/src/arrow/reader.rs | 13 +- ...ocessor.rs => record_batch_transformer.rs} | 191 +++++++++--------- 3 files changed, 98 insertions(+), 108 deletions(-) rename crates/iceberg/src/arrow/{record_batch_evolution_processor.rs => record_batch_transformer.rs} (79%) diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index b0e0805fe..31a892fa8 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -20,6 +20,6 @@ mod schema; pub use schema::*; mod reader; -pub(crate) mod record_batch_evolution_processor; +pub(crate) mod record_batch_transformer; pub use reader::*; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 0bae4d808..6ab635a2d 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -38,7 +38,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI use parquet::file::metadata::ParquetMetaData; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; -use crate::arrow::record_batch_evolution_processor::RecordBatchEvolutionProcessor; +use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::error::Result; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; @@ -210,10 +210,11 @@ impl ArrowReader { )?; record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask); - // create a RecordBatchEvolutionProcessor if our task schema contains columns - // not present in the parquet file or whose types have been promoted - let mut record_batch_evolution_processor = - RecordBatchEvolutionProcessor::build(task.schema_ref(), task.project_field_ids()); + // RecordBatchTransformer performs any required transformations on the RecordBatches + // that come back from the file, such as type promotion, default column insertion + // and column re-ordering + let mut record_batch_transformer = + RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids()); if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); @@ -269,7 +270,7 @@ impl ArrowReader { let mut record_batch_stream = record_batch_stream_builder.build()?; while let Some(batch) = record_batch_stream.try_next().await? { - tx.send(record_batch_evolution_processor.process_record_batch(batch)) + tx.send(record_batch_transformer.process_record_batch(batch)) .await? } diff --git a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs similarity index 79% rename from crates/iceberg/src/arrow/record_batch_evolution_processor.rs rename to crates/iceberg/src/arrow/record_batch_transformer.rs index 6a58cc68f..dc31a9c10 100644 --- a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -32,33 +32,32 @@ use crate::arrow::schema_to_arrow_schema; use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema}; use crate::{Error, ErrorKind, Result}; -/// Represents an operation that needs to be performed -/// to transform a RecordBatch coming from a Parquet file record -/// batch stream so that it conforms to an Iceberg schema that has -/// evolved from the one that was used when the file was written. +/// Indicates how a particular column in a processed RecordBatch should +/// be sourced. #[derive(Debug)] -pub(crate) enum EvolutionAction { +pub(crate) enum ColumnSource { // signifies that a column should be passed through unmodified + // from the file's RecordBatch PassThrough { source_index: usize, }, - // signifies particular column has undergone type promotion, and so - // the source column with the given index needs to be promoted to the - // specified type + // signifies that a column from the file's RecordBatch has undergone + // type promotion so the source column with the given index needs + // to be promoted to the specified type Promote { target_type: DataType, source_index: usize, }, - // Signifies that a new column has been inserted before the row + // Signifies that a new column has been inserted before the column // with index `index`. (we choose "before" rather than "after" so // that we can use usize; if we insert after, then we need to - // be able to store -1 here when we want to indicate that the new - // column is to be added at the front of the list). + // be able to store -1 here to signify that a new + // column is to be added at the front of the column list). // If multiple columns need to be inserted at a given // location, they should all be given the same index, as the index - // here refers to the original record batch, not the interim state after + // here refers to the original RecordBatch, not the interim state after // a preceding operation. Add { target_type: DataType, @@ -67,7 +66,7 @@ pub(crate) enum EvolutionAction { // The iceberg spec refers to other permissible schema evolution actions // (see https://iceberg.apache.org/spec/#schema-evolution): // renaming fields, deleting fields and reordering fields. - // Renames only affect the RecordBatch schema rather than the + // Renames only affect the schema of the RecordBatch rather than the // columns themselves, so a single updated cached schema can // be re-used and no per-column actions are required. // Deletion and Reorder can be achieved without needing this @@ -75,35 +74,45 @@ pub(crate) enum EvolutionAction { } #[derive(Debug)] -struct SchemaAndOps { - // Every transformed RecordBatch will have the same schema. We create the - // target just once and cache it here. Helpfully, Arc is needed in - // the constructor for RecordBatch, so we don't need an expensive copy - // each time. - pub target_schema: Arc, - - // Indicates how each column in the target schema is derived. - pub operations: Vec, +enum BatchTransform { + // Indicates that no changes need to be performed to the RecordBatches + // coming in from the stream and that they can be passed through + // unmodified + PassThrough, + + Modify { + // Every transformed RecordBatch will have the same schema. We create the + // target just once and cache it here. Helpfully, Arc is needed in + // the constructor for RecordBatch, so we don't need an expensive copy + // each time we build a new RecordBatch + target_schema: Arc, + + // Indicates how each column in the target schema is derived. + operations: Vec, + }, } #[derive(Debug)] -pub(crate) struct RecordBatchEvolutionProcessor { +pub(crate) struct RecordBatchTransformer { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, - schema_and_ops: Option, + + // BatchTransform gets lazily constructed based on the schema of + // the first RecordBatch we receive from the file + batch_transform: Option, } -impl RecordBatchEvolutionProcessor { - /// Fallibly try to build a RecordBatchEvolutionProcessor for a given parquet file schema - /// and Iceberg snapshot schema. Returns Ok(None) if the processor would not be required - /// due to the file schema already matching the snapshot schema +impl RecordBatchTransformer { + /// Build a RecordBatchTransformer for a given + /// Iceberg snapshot schema and list of projected field ids. pub(crate) fn build( - // source_schema: &ArrowSchemaRef, snapshot_schema: Arc, projected_iceberg_field_ids: &[i32], ) -> Self { let projected_iceberg_field_ids = if projected_iceberg_field_ids.is_empty() { - // project all fields in table schema order + // If the list of field ids is empty, this indicates that we + // need to select all fields. + // Project all fields in table schema order snapshot_schema .as_struct() .fields() @@ -117,61 +126,47 @@ impl RecordBatchEvolutionProcessor { Self { snapshot_schema, projected_iceberg_field_ids, - schema_and_ops: None, + batch_transform: None, } - - // let (operations, target_schema) = Self::generate_operations_and_schema( - // source_schema, - // snapshot_schema, - // projected_iceberg_field_ids, - // )?; - // - // Ok(if target_schema.as_ref() == source_schema.as_ref() { - // None - // } else { - // Some(Self { - // operations, - // target_schema, - // }) - // }) } pub(crate) fn process_record_batch( &mut self, record_batch: RecordBatch, ) -> Result { - if self.schema_and_ops.is_none() { - self.schema_and_ops = Some(Self::generate_operations_and_schema( - record_batch.schema_ref(), - self.snapshot_schema.as_ref(), - &self.projected_iceberg_field_ids, - )?); - } - - let Some(SchemaAndOps { - ref target_schema, .. - }) = self.schema_and_ops - else { - return Err(Error::new( - ErrorKind::Unexpected, - "schema_and_ops always created at this point", - )); - }; - - Ok(RecordBatch::try_new( - target_schema.clone(), - self.transform_columns(record_batch.columns())?, - )?) + Ok(match self.batch_transform { + Some(BatchTransform::PassThrough) => record_batch, + Some(BatchTransform::Modify { + ref target_schema, + ref operations, + }) => RecordBatch::try_new( + target_schema.clone(), + self.transform_columns(record_batch.columns(), operations)?, + )?, + None => { + self.batch_transform = Some(Self::generate_batch_transform( + record_batch.schema_ref(), + self.snapshot_schema.as_ref(), + &self.projected_iceberg_field_ids, + )?); + + self.process_record_batch(record_batch)? + } + }) } - // create the (possibly empty) list of `EvolutionOp`s that we need - // to apply to the arrays in a record batch with `source_schema` so - // that it matches the `snapshot_schema` - fn generate_operations_and_schema( + // Compare the schema of the incoming RecordBatches to the schema of + // the Iceberg snapshot to determine what, if any, transformation + // needs to be applied. If the schemas match, we return BatchTransform::PassThrough + // to indicate that no changes need to be made. Otherwise, we return a + // BatchTransform::Modify containing the target RecordBatch schema and + // the list of `ColumnSource`s that indicate how to source each column in + // the resulting RecordBatches. + fn generate_batch_transform( source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], - ) -> Result { + ) -> Result { let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); let field_id_to_source_schema_map = Self::build_field_id_to_arrow_schema_map(source_schema)?; @@ -190,7 +185,11 @@ impl RecordBatchEvolutionProcessor { .clone()) }) .collect(); + let target_schema = ArrowSchema::new(fields?); + if target_schema == *source_schema.as_ref() { + return Ok(BatchTransform::PassThrough); + } let operations: Result> = projected_iceberg_field_ids.iter().map(|field_id|{ let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or( @@ -203,12 +202,12 @@ impl RecordBatchEvolutionProcessor { if source_field.data_type().equals_datatype(target_type) { // no promotion required - EvolutionAction::PassThrough { + ColumnSource::PassThrough { source_index: *source_index } } else { // promotion required - EvolutionAction::Promote { + ColumnSource::Promote { target_type: target_type.clone(), source_index: *source_index, } @@ -222,25 +221,25 @@ impl RecordBatchEvolutionProcessor { let default_value = if let Some(ref iceberg_default_value) = &iceberg_field.initial_default { - let Literal::Primitive(prim_value) = iceberg_default_value else { + let Literal::Primitive(primitive_literal) = iceberg_default_value else { return Err(Error::new( ErrorKind::Unexpected, format!("Default value for column must be primitive type, but encountered {:?}", iceberg_default_value) )); }; - Some(prim_value.clone()) + Some(primitive_literal.clone()) } else { None }; - EvolutionAction::Add { + ColumnSource::Add { value: default_value, target_type: target_type.clone(), } }) }).collect(); - Ok(SchemaAndOps { + Ok(BatchTransform::Modify { operations: operations?, target_schema: Arc::new(target_schema), }) @@ -278,37 +277,30 @@ impl RecordBatchEvolutionProcessor { fn transform_columns( &self, columns: &[Arc], + operations: &[ColumnSource], ) -> Result>> { if columns.is_empty() { return Ok(columns.to_vec()); } let num_rows = columns[0].len(); - let Some(ref schema_and_ops) = self.schema_and_ops else { - return Err(Error::new( - ErrorKind::Unexpected, - "schema_and_ops was None, but should be present", - )); - }; - - let result: Result> = schema_and_ops - .operations + operations .iter() .map(|op| { Ok(match op { - EvolutionAction::PassThrough { source_index } => columns[*source_index].clone(), - EvolutionAction::Promote { + ColumnSource::PassThrough { source_index } => columns[*source_index].clone(), + + ColumnSource::Promote { target_type, source_index, } => cast(&*columns[*source_index], target_type)?, - EvolutionAction::Add { target_type, value } => { + + ColumnSource::Add { target_type, value } => { Self::create_column(target_type, value, num_rows)? } }) }) - .collect(); - - result + .collect() } fn create_column( @@ -388,7 +380,7 @@ mod test { use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; - use crate::arrow::record_batch_evolution_processor::RecordBatchEvolutionProcessor; + use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Type}; #[test] @@ -396,8 +388,7 @@ mod test { let arrow_schema = arrow_schema_already_same_as_target(); let result = - RecordBatchEvolutionProcessor::build_field_id_to_arrow_schema_map(&arrow_schema) - .unwrap(); + RecordBatchTransformer::build_field_id_to_arrow_schema_map(&arrow_schema).unwrap(); let expected = HashMap::from_iter([ (10, (arrow_schema.fields()[0].clone(), 0)), @@ -415,8 +406,7 @@ mod test { let snapshot_schema = Arc::new(iceberg_table_schema()); let projected_iceberg_field_ids = [13, 14]; - let mut inst = - RecordBatchEvolutionProcessor::build(snapshot_schema, &projected_iceberg_field_ids); + let mut inst = RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids); let result = inst .process_record_batch(source_record_batch_no_migration_required()) @@ -432,8 +422,7 @@ mod test { let snapshot_schema = Arc::new(iceberg_table_schema()); let projected_iceberg_field_ids = [10, 11, 12, 14, 15]; // a, b, c, e, f - let mut inst = - RecordBatchEvolutionProcessor::build(snapshot_schema, &projected_iceberg_field_ids); + let mut inst = RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids); let result = inst.process_record_batch(source_record_batch()).unwrap(); From 0ed57212fcff716ea500f3b3e55e14a7ef2b87df Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Thu, 3 Oct 2024 20:13:50 +0100 Subject: [PATCH 09/10] feat: more performant handling of case where only schema transform is required but columns can remain unmodified --- .../src/arrow/record_batch_transformer.rs | 103 +++++++++++++++--- crates/iceberg/src/lib.rs | 1 + 2 files changed, 91 insertions(+), 13 deletions(-) diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index dc31a9c10..57fca32e6 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -90,6 +90,21 @@ enum BatchTransform { // Indicates how each column in the target schema is derived. operations: Vec, }, + + // Sometimes only the schema will need modifying, for example when + // the column names have changed vs the file, but not the column types. + // we can avoid a heap allocation per RecordBach in this case by retaining + // the existing column Vec. + ModifySchema { + target_schema: Arc, + }, +} + +#[derive(Debug)] +enum SchemaComparison { + Equivalent, + NameChangesOnly, + Different, } #[derive(Debug)] @@ -134,7 +149,7 @@ impl RecordBatchTransformer { &mut self, record_batch: RecordBatch, ) -> Result { - Ok(match self.batch_transform { + Ok(match &self.batch_transform { Some(BatchTransform::PassThrough) => record_batch, Some(BatchTransform::Modify { ref target_schema, @@ -143,6 +158,9 @@ impl RecordBatchTransformer { target_schema.clone(), self.transform_columns(record_batch.columns(), operations)?, )?, + Some(BatchTransform::ModifySchema { target_schema }) => { + record_batch.with_schema(target_schema.clone())? + } None => { self.batch_transform = Some(Self::generate_batch_transform( record_batch.schema_ref(), @@ -168,8 +186,6 @@ impl RecordBatchTransformer { projected_iceberg_field_ids: &[i32], ) -> Result { let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); - let field_id_to_source_schema_map = - Self::build_field_id_to_arrow_schema_map(source_schema)?; let field_id_to_mapped_schema_map = Self::build_field_id_to_arrow_schema_map(&mapped_unprojected_arrow_schema)?; @@ -186,12 +202,78 @@ impl RecordBatchTransformer { }) .collect(); - let target_schema = ArrowSchema::new(fields?); - if target_schema == *source_schema.as_ref() { - return Ok(BatchTransform::PassThrough); + let target_schema = Arc::new(ArrowSchema::new(fields?)); + + match Self::compare_schemas(source_schema, &target_schema) { + SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough), + SchemaComparison::NameChangesOnly => Ok(BatchTransform::ModifySchema { target_schema }), + SchemaComparison::Different => Ok(BatchTransform::Modify { + operations: Self::generate_transform_operations( + source_schema, + snapshot_schema, + projected_iceberg_field_ids, + field_id_to_mapped_schema_map, + )?, + target_schema, + }), + } + } + + /// Compares the source and target schemas + /// Determines if they have changed in any meaningful way: + /// * If they have different numbers of fields, then we need to modify + /// the incoming RecordBatch schema AND columns + /// * If they have the same number of fields, but some of them differ in + /// either data type or nullability, then we need to modify the + /// incoming RecordBatch schema AND columns + /// * If the schemas differ only in the column names, then we need + /// to modify the RecordBatch schema BUT we can keep the + /// original column data unmodified + /// * If the schemas are identical (or differ only in inconsequential + /// ways) then we can pass through the original RecordBatch unmodified + fn compare_schemas( + source_schema: &ArrowSchemaRef, + target_schema: &ArrowSchemaRef, + ) -> SchemaComparison { + if source_schema.fields().len() != target_schema.fields().len() { + return SchemaComparison::Different; + } + + let mut names_changed = false; + + for (source_field, target_field) in source_schema + .fields() + .iter() + .zip(target_schema.fields().iter()) + { + if source_field.data_type() != target_field.data_type() + || source_field.is_nullable() != target_field.is_nullable() + { + return SchemaComparison::Different; + } + + if source_field.name() != target_field.name() { + names_changed = true; + } } - let operations: Result> = projected_iceberg_field_ids.iter().map(|field_id|{ + if names_changed { + SchemaComparison::NameChangesOnly + } else { + SchemaComparison::Equivalent + } + } + + fn generate_transform_operations( + source_schema: &ArrowSchemaRef, + snapshot_schema: &IcebergSchema, + projected_iceberg_field_ids: &[i32], + field_id_to_mapped_schema_map: HashMap, + ) -> Result> { + let field_id_to_source_schema_map = + Self::build_field_id_to_arrow_schema_map(source_schema)?; + + projected_iceberg_field_ids.iter().map(|field_id|{ let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or( Error::new(ErrorKind::Unexpected, "could not find field in schema") )?; @@ -237,12 +319,7 @@ impl RecordBatchTransformer { target_type: target_type.clone(), } }) - }).collect(); - - Ok(BatchTransform::Modify { - operations: operations?, - target_schema: Arc::new(target_schema), - }) + }).collect() } fn build_field_id_to_arrow_schema_map( diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index d6c5010d3..72cf18d4b 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -55,6 +55,7 @@ #[macro_use] extern crate derive_builder; +extern crate core; mod error; pub use error::{Error, ErrorKind, Result}; From 61d4bdc4140f91ad0f59e8efe5de1bd42942bd27 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Thu, 3 Oct 2024 22:21:02 +0100 Subject: [PATCH 10/10] refactor: import arrow_cast rather than arrow --- Cargo.toml | 2 +- crates/iceberg/Cargo.toml | 2 +- crates/iceberg/src/arrow/record_batch_transformer.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ca14dcc0c..5e2b89730 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,9 +39,9 @@ rust-version = "1.77.1" anyhow = "1.0.72" apache-avro = "0.17" array-init = "2" -arrow = { version = "53" } arrow-arith = { version = "53" } arrow-array = { version = "53" } +arrow-cast = { version = "53" } arrow-ord = { version = "53" } arrow-schema = { version = "53" } arrow-select = { version = "53" } diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 3a4d44648..1307cc6f3 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -44,9 +44,9 @@ tokio = ["dep:tokio"] anyhow = { workspace = true } apache-avro = { workspace = true } array-init = { workspace = true } -arrow = { workspace = true } arrow-arith = { workspace = true } arrow-array = { workspace = true } +arrow-cast = { workspace = true } arrow-ord = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 57fca32e6..01ce9f0a8 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -18,11 +18,11 @@ use std::collections::HashMap; use std::sync::Arc; -use arrow::compute::cast; use arrow_array::{ Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, StringArray, }; +use arrow_cast::cast; use arrow_schema::{ DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, };