Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

RecordBatchTransformer: Handle schema migration and column re-ordering in table scans #602

Merged
merged 10 commits into from
Oct 11, 2024
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ apache-avro = "0.17"
array-init = "2"
arrow-arith = { version = "53" }
arrow-array = { version = "53" }
arrow-cast = { version = "53" }
arrow-ord = { version = "53" }
arrow-schema = { version = "53" }
arrow-select = { version = "53" }
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ apache-avro = { workspace = true }
array-init = { workspace = true }
arrow-arith = { workspace = true }
arrow-array = { workspace = true }
arrow-cast = { workspace = true }
arrow-ord = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@
mod schema;
pub use schema::*;
mod reader;
pub(crate) mod record_batch_transformer;

pub use reader::*;
11 changes: 10 additions & 1 deletion crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI
use parquet::file::metadata::ParquetMetaData;
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};

use crate::arrow::record_batch_transformer::RecordBatchTransformer;
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::error::Result;
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
Expand Down Expand Up @@ -209,6 +210,12 @@ impl ArrowReader {
)?;
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);

// RecordBatchTransformer performs any required transformations on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
}
Expand Down Expand Up @@ -261,8 +268,10 @@ impl ArrowReader {
// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let mut record_batch_stream = record_batch_stream_builder.build()?;

while let Some(batch) = record_batch_stream.try_next().await? {
tx.send(Ok(batch)).await?
tx.send(record_batch_transformer.process_record_batch(batch))
.await?
}

Ok(())
Expand Down
Loading
Loading