From d02ca88f1e7a97d4fc20e3d4bd33437347396dd6 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Fri, 12 Apr 2024 00:12:14 +0800 Subject: [PATCH 1/2] support iceberg schema to arrow schema --- crates/iceberg/src/arrow/schema.rs | 861 ++++++++++++++++++++++++----- 1 file changed, 725 insertions(+), 136 deletions(-) diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 7e01b20ad..d7c628cca 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -19,10 +19,15 @@ use crate::error::Result; use crate::spec::{ - ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, + ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, SchemaVisitor, + StructType, Type, }; use crate::{Error, ErrorKind}; +use arrow_array::types::{validate_decimal_precision_and_scale, Decimal128Type}; use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use rust_decimal::prelude::ToPrimitive; +use std::collections::HashMap; use std::sync::Arc; /// A post order arrow schema visitor. @@ -198,11 +203,10 @@ pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result { visit_schema(schema, &mut visitor) } -const ARROW_FIELD_ID_KEY: &str = "PARQUET:field_id"; const ARROW_FIELD_DOC_KEY: &str = "doc"; fn get_field_id(field: &Field) -> Result { - if let Some(value) = field.metadata().get(ARROW_FIELD_ID_KEY) { + if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) { return value.parse::().map_err(|e| { Error::new( ErrorKind::DataInvalid, @@ -385,9 +389,221 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { } } +struct ToArrowSchemaConverter; + +enum ArrowSchemaOrFieldOrType { + Schema(ArrowSchema), + Field(Field), + Type(DataType), +} + +impl SchemaVisitor for ToArrowSchemaConverter { + type T = ArrowSchemaOrFieldOrType; + + fn schema( + &mut self, + _schema: &crate::spec::Schema, + value: ArrowSchemaOrFieldOrType, + ) -> crate::Result { + let struct_type = match value { + ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields)) => fields, + _ => unreachable!(), + }; + Ok(ArrowSchemaOrFieldOrType::Schema(ArrowSchema::new( + struct_type, + ))) + } + + fn field( + &mut self, + field: &crate::spec::NestedFieldRef, + value: ArrowSchemaOrFieldOrType, + ) -> crate::Result { + let ty = match value { + ArrowSchemaOrFieldOrType::Type(ty) => ty, + _ => unreachable!(), + }; + let mut metadata = HashMap::new(); + metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string()); + if let Some(doc) = &field.doc { + metadata.insert(ARROW_FIELD_DOC_KEY.to_string(), doc.clone()); + } + Ok(ArrowSchemaOrFieldOrType::Field( + Field::new(field.name.clone(), ty, !field.required).with_metadata(metadata), + )) + } + + fn r#struct( + &mut self, + _: &crate::spec::StructType, + results: Vec, + ) -> crate::Result { + let fields = results + .into_iter() + .map(|result| match result { + ArrowSchemaOrFieldOrType::Field(field) => field, + _ => unreachable!(), + }) + .collect(); + Ok(ArrowSchemaOrFieldOrType::Type(DataType::Struct(fields))) + } + + fn list( + &mut self, + list: &crate::spec::ListType, + value: ArrowSchemaOrFieldOrType, + ) -> crate::Result { + let field = match self.field(&list.element_field, value)? { + ArrowSchemaOrFieldOrType::Field(field) => field, + _ => unreachable!(), + }; + let mut meta = HashMap::new(); + meta.insert( + PARQUET_FIELD_ID_META_KEY.to_string(), + list.element_field.id.to_string(), + ); + if let Some(doc) = &list.element_field.doc { + meta.insert(ARROW_FIELD_DOC_KEY.to_string(), doc.clone()); + } + let field = field.with_metadata(meta); + Ok(ArrowSchemaOrFieldOrType::Type(DataType::List(Arc::new( + field, + )))) + } + + fn map( + &mut self, + map: &crate::spec::MapType, + key_value: ArrowSchemaOrFieldOrType, + value: ArrowSchemaOrFieldOrType, + ) -> crate::Result { + let key_field = match self.field(&map.key_field, key_value)? { + ArrowSchemaOrFieldOrType::Field(field) => field, + _ => unreachable!(), + }; + let value_field = match self.field(&map.value_field, value)? { + ArrowSchemaOrFieldOrType::Field(field) => field, + _ => unreachable!(), + }; + let field = Field::new( + "entries", + DataType::Struct(vec![key_field, value_field].into()), + map.value_field.required, + ); + + Ok(ArrowSchemaOrFieldOrType::Type(DataType::Map( + field.into(), + false, + ))) + } + + fn primitive( + &mut self, + p: &crate::spec::PrimitiveType, + ) -> crate::Result { + match p { + crate::spec::PrimitiveType::Boolean => { + Ok(ArrowSchemaOrFieldOrType::Type(DataType::Boolean)) + } + crate::spec::PrimitiveType::Int => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int32)), + crate::spec::PrimitiveType::Long => Ok(ArrowSchemaOrFieldOrType::Type(DataType::Int64)), + crate::spec::PrimitiveType::Float => { + Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float32)) + } + crate::spec::PrimitiveType::Double => { + Ok(ArrowSchemaOrFieldOrType::Type(DataType::Float64)) + } + crate::spec::PrimitiveType::Decimal { precision, scale } => { + let (precision, scale) = { + let precision: u8 = precision.to_owned().try_into().map_err(|err| { + Error::new( + crate::ErrorKind::DataInvalid, + "incompatible precision for decimal type convert", + ) + .with_source(err) + })?; + let scale = scale.to_owned().try_into().map_err(|err| { + Error::new( + crate::ErrorKind::DataInvalid, + "incompatible scale for decimal type convert", + ) + .with_source(err) + })?; + (precision, scale) + }; + validate_decimal_precision_and_scale::(precision, scale).map_err( + |err| { + Error::new( + crate::ErrorKind::DataInvalid, + "incompatible precision and scale for decimal type convert", + ) + .with_source(err) + }, + )?; + Ok(ArrowSchemaOrFieldOrType::Type(DataType::Decimal128( + precision, scale, + ))) + } + crate::spec::PrimitiveType::Date => { + Ok(ArrowSchemaOrFieldOrType::Type(DataType::Date32)) + } + crate::spec::PrimitiveType::Time => Ok(ArrowSchemaOrFieldOrType::Type( + DataType::Time32(TimeUnit::Microsecond), + )), + crate::spec::PrimitiveType::Timestamp => Ok(ArrowSchemaOrFieldOrType::Type( + DataType::Timestamp(TimeUnit::Microsecond, None), + )), + crate::spec::PrimitiveType::Timestamptz => Ok(ArrowSchemaOrFieldOrType::Type( + // Timestampz always stored as UTC + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + )), + crate::spec::PrimitiveType::String => { + Ok(ArrowSchemaOrFieldOrType::Type(DataType::Utf8)) + } + crate::spec::PrimitiveType::Uuid => Ok(ArrowSchemaOrFieldOrType::Type( + DataType::FixedSizeBinary(16), + )), + crate::spec::PrimitiveType::Fixed(len) => Ok(ArrowSchemaOrFieldOrType::Type( + len.to_i32() + .map(DataType::FixedSizeBinary) + .unwrap_or(DataType::LargeBinary), + )), + crate::spec::PrimitiveType::Binary => { + Ok(ArrowSchemaOrFieldOrType::Type(DataType::LargeBinary)) + } + } + } +} + +/// Convert iceberg schema to an arrow schema. +pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result { + let mut converter = ToArrowSchemaConverter; + match crate::spec::visit_schema(schema, &mut converter)? { + ArrowSchemaOrFieldOrType::Schema(schema) => Ok(schema), + _ => unreachable!(), + } +} + +impl TryFrom<&ArrowSchema> for crate::spec::Schema { + type Error = Error; + + fn try_from(schema: &ArrowSchema) -> crate::Result { + arrow_schema_to_schema(schema) + } +} + +impl TryFrom<&crate::spec::Schema> for ArrowSchema { + type Error = Error; + + fn try_from(schema: &crate::spec::Schema) -> crate::Result { + schema_to_arrow_schema(schema) + } +} + #[cfg(test)] mod tests { use super::*; + use crate::spec::Schema; use arrow_schema::DataType; use arrow_schema::Field; use arrow_schema::Schema as ArrowSchema; @@ -395,15 +611,14 @@ mod tests { use std::collections::HashMap; use std::sync::Arc; - #[test] - fn test_arrow_schema_to_schema() { + fn arrow_schema_for_arrow_schema_to_schema_test() -> ArrowSchema { let fields = Fields::from(vec![ Field::new("key", DataType::Int32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), + PARQUET_FIELD_ID_META_KEY.to_string(), "17".to_string(), )])), Field::new("value", DataType::Utf8, true).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), + PARQUET_FIELD_ID_META_KEY.to_string(), "18".to_string(), )])), ]); @@ -412,7 +627,7 @@ mod tests { let map = DataType::Map( Arc::new( Field::new("entries", r#struct, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), + PARQUET_FIELD_ID_META_KEY.to_string(), "19".to_string(), )])), ), @@ -421,11 +636,11 @@ mod tests { let fields = Fields::from(vec![ Field::new("aa", DataType::Int32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), + PARQUET_FIELD_ID_META_KEY.to_string(), "18".to_string(), )])), Field::new("bb", DataType::Utf8, true).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), + PARQUET_FIELD_ID_META_KEY.to_string(), "19".to_string(), )])), Field::new( @@ -434,141 +649,145 @@ mod tests { false, ) .with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), + PARQUET_FIELD_ID_META_KEY.to_string(), "20".to_string(), )])), ]); let r#struct = DataType::Struct(fields); - let schema = - ArrowSchema::new(vec![ - Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "2".to_string(), - )])), - Field::new("b", DataType::Int64, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "1".to_string(), - )])), - Field::new("c", DataType::Utf8, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "3".to_string(), - )])), - Field::new("n", DataType::LargeUtf8, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "21".to_string(), - )])), - Field::new("d", DataType::Timestamp(TimeUnit::Microsecond, None), true) - .with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "4".to_string(), + ArrowSchema::new(vec![ + Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("b", DataType::Int64, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("c", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + Field::new("n", DataType::LargeUtf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "21".to_string(), + )])), + Field::new("d", DataType::Timestamp(TimeUnit::Microsecond, None), true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]), + ), + Field::new("e", DataType::Boolean, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )])), + Field::new("f", DataType::Float32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )])), + Field::new("g", DataType::Float64, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )])), + Field::new("p", DataType::Decimal128(10, 2), false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "27".to_string(), + )])), + Field::new("h", DataType::Date32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "8".to_string(), + )])), + Field::new("i", DataType::Time64(TimeUnit::Microsecond), false).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "9".to_string())]), + ), + Field::new( + "j", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "10".to_string(), + )])), + Field::new( + "k", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "12".to_string(), + )])), + Field::new("l", DataType::Binary, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "13".to_string(), + )])), + Field::new("o", DataType::LargeBinary, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "22".to_string(), + )])), + Field::new("m", DataType::FixedSizeBinary(10), false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "11".to_string(), + )])), + Field::new( + "list", + DataType::List(Arc::new( + Field::new("element", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "15".to_string(), )])), - Field::new("e", DataType::Boolean, true).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "6".to_string(), - )])), - Field::new("f", DataType::Float32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "5".to_string(), - )])), - Field::new("g", DataType::Float64, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "7".to_string(), - )])), - Field::new("p", DataType::Decimal128(10, 2), false).with_metadata(HashMap::from([ - (ARROW_FIELD_ID_KEY.to_string(), "27".to_string()), - ])), - Field::new("h", DataType::Date32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "8".to_string(), - )])), - Field::new("i", DataType::Time64(TimeUnit::Microsecond), false).with_metadata( - HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "9".to_string())]), - ), - Field::new( - "j", - DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - false, - ) - .with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "10".to_string(), - )])), - Field::new( - "k", - DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), - false, - ) - .with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "12".to_string(), - )])), - Field::new("l", DataType::Binary, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "13".to_string(), - )])), - Field::new("o", DataType::LargeBinary, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "22".to_string(), - )])), - Field::new("m", DataType::FixedSizeBinary(10), false).with_metadata(HashMap::from( - [(ARROW_FIELD_ID_KEY.to_string(), "11".to_string())], )), - Field::new( - "list", - DataType::List(Arc::new( - Field::new("element", DataType::Int32, false).with_metadata(HashMap::from( - [(ARROW_FIELD_ID_KEY.to_string(), "15".to_string())], - )), - )), - true, - ) - .with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "14".to_string(), - )])), - Field::new( - "large_list", - DataType::LargeList(Arc::new( - Field::new("element", DataType::Utf8, false).with_metadata(HashMap::from( - [(ARROW_FIELD_ID_KEY.to_string(), "23".to_string())], - )), - )), - true, - ) - .with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "24".to_string(), - )])), - Field::new( - "fixed_list", - DataType::FixedSizeList( - Arc::new( - Field::new("element", DataType::Binary, false).with_metadata( - HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "26".to_string())]), - ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "14".to_string(), + )])), + Field::new( + "large_list", + DataType::LargeList(Arc::new( + Field::new("element", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "23".to_string(), + )])), + )), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "24".to_string(), + )])), + Field::new( + "fixed_list", + DataType::FixedSizeList( + Arc::new( + Field::new("element", DataType::Binary, false).with_metadata( + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "26".to_string(), + )]), ), - 10, ), - true, - ) - .with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "25".to_string(), - )])), - Field::new("map", map, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "16".to_string(), - )])), - Field::new("struct", r#struct, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "17".to_string(), - )])), - ]); - let schema = Arc::new(schema); - let result = arrow_schema_to_schema(&schema).unwrap(); + 10, + ), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "25".to_string(), + )])), + Field::new("map", map, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "16".to_string(), + )])), + Field::new("struct", r#struct, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "17".to_string(), + )])), + ]) + } + fn iceberg_schema_for_arrow_schema_to_schema_test() -> Schema { let schema_json = r#"{ "type":"struct", "schema-id":0, @@ -747,7 +966,377 @@ mod tests { "identifier-field-ids":[] }"#; - let expected_type: Schema = serde_json::from_str(schema_json).unwrap(); - assert_eq!(result, expected_type); + let schema: Schema = serde_json::from_str(schema_json).unwrap(); + schema + } + + #[test] + fn test_arrow_schema_to_schema() { + let arrow_schema = arrow_schema_for_arrow_schema_to_schema_test(); + let schema = iceberg_schema_for_arrow_schema_to_schema_test(); + let converted_schema = arrow_schema_to_schema(&arrow_schema).unwrap(); + assert_eq!(converted_schema, schema); + } + + fn arrow_schema_for_schema_to_arrow_schema_test() -> ArrowSchema { + let fields = Fields::from(vec![ + Field::new("key", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "17".to_string(), + )])), + Field::new("value", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "18".to_string(), + )])), + ]); + + let r#struct = DataType::Struct(fields); + let map = DataType::Map(Arc::new(Field::new("entries", r#struct, false)), false); + + let fields = Fields::from(vec![ + Field::new("aa", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "18".to_string(), + )])), + Field::new("bb", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "19".to_string(), + )])), + Field::new( + "cc", + DataType::Timestamp(TimeUnit::Microsecond, None), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "20".to_string(), + )])), + ]); + + let r#struct = DataType::Struct(fields); + + ArrowSchema::new(vec![ + Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("b", DataType::Int64, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("c", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + Field::new("n", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "21".to_string(), + )])), + Field::new("d", DataType::Timestamp(TimeUnit::Microsecond, None), true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]), + ), + Field::new("e", DataType::Boolean, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )])), + Field::new("f", DataType::Float32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )])), + Field::new("g", DataType::Float64, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )])), + Field::new("p", DataType::Decimal128(10, 2), false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "27".to_string(), + )])), + Field::new("h", DataType::Date32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "8".to_string(), + )])), + Field::new("i", DataType::Time32(TimeUnit::Microsecond), false).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "9".to_string())]), + ), + Field::new( + "j", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "10".to_string(), + )])), + Field::new( + "k", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "12".to_string(), + )])), + Field::new("l", DataType::LargeBinary, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "13".to_string(), + )])), + Field::new("o", DataType::LargeBinary, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "22".to_string(), + )])), + Field::new("m", DataType::FixedSizeBinary(10), false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "11".to_string(), + )])), + Field::new( + "list", + DataType::List(Arc::new( + Field::new("element", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "15".to_string(), + )])), + )), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "14".to_string(), + )])), + Field::new( + "large_list", + DataType::List(Arc::new( + Field::new("element", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "23".to_string(), + )])), + )), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "24".to_string(), + )])), + Field::new( + "fixed_list", + DataType::List(Arc::new( + Field::new("element", DataType::LargeBinary, false).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "26".to_string())]), + ), + )), + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "25".to_string(), + )])), + Field::new("map", map, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "16".to_string(), + )])), + Field::new("struct", r#struct, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "17".to_string(), + )])), + Field::new("uuid", DataType::FixedSizeBinary(16), false).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "26".to_string())], + )), + ]) + } + + fn iceberg_schema_for_schema_to_arrow_schema() -> Schema { + let schema_json = r#"{ + "type":"struct", + "schema-id":0, + "fields":[ + { + "id":2, + "name":"a", + "required":true, + "type":"int" + }, + { + "id":1, + "name":"b", + "required":true, + "type":"long" + }, + { + "id":3, + "name":"c", + "required":true, + "type":"string" + }, + { + "id":21, + "name":"n", + "required":true, + "type":"string" + }, + { + "id":4, + "name":"d", + "required":false, + "type":"timestamp" + }, + { + "id":6, + "name":"e", + "required":false, + "type":"boolean" + }, + { + "id":5, + "name":"f", + "required":true, + "type":"float" + }, + { + "id":7, + "name":"g", + "required":true, + "type":"double" + }, + { + "id":27, + "name":"p", + "required":true, + "type":"decimal(10,2)" + }, + { + "id":8, + "name":"h", + "required":true, + "type":"date" + }, + { + "id":9, + "name":"i", + "required":true, + "type":"time" + }, + { + "id":10, + "name":"j", + "required":true, + "type":"timestamptz" + }, + { + "id":12, + "name":"k", + "required":true, + "type":"timestamptz" + }, + { + "id":13, + "name":"l", + "required":true, + "type":"binary" + }, + { + "id":22, + "name":"o", + "required":true, + "type":"binary" + }, + { + "id":11, + "name":"m", + "required":true, + "type":"fixed[10]" + }, + { + "id":14, + "name":"list", + "required": false, + "type": { + "type": "list", + "element-id": 15, + "element-required": true, + "element": "int" + } + }, + { + "id":24, + "name":"large_list", + "required": false, + "type": { + "type": "list", + "element-id": 23, + "element-required": true, + "element": "string" + } + }, + { + "id":25, + "name":"fixed_list", + "required": false, + "type": { + "type": "list", + "element-id": 26, + "element-required": true, + "element": "binary" + } + }, + { + "id":16, + "name":"map", + "required": true, + "type": { + "type": "map", + "key-id": 17, + "key": "int", + "value-id": 18, + "value-required": false, + "value": "string" + } + }, + { + "id":17, + "name":"struct", + "required": true, + "type": { + "type": "struct", + "fields": [ + { + "id":18, + "name":"aa", + "required":true, + "type":"int" + }, + { + "id":19, + "name":"bb", + "required":false, + "type":"string" + }, + { + "id":20, + "name":"cc", + "required":true, + "type":"timestamp" + } + ] + } + }, + { + "id":26, + "name":"uuid", + "required":true, + "type":"uuid" + } + ], + "identifier-field-ids":[] + }"#; + + let schema: Schema = serde_json::from_str(schema_json).unwrap(); + schema + } + + #[test] + fn test_schema_to_arrow_schema() { + let arrow_schema = arrow_schema_for_schema_to_arrow_schema_test(); + let schema = iceberg_schema_for_schema_to_arrow_schema(); + let converted_arrow_schema = schema_to_arrow_schema(&schema).unwrap(); + assert_eq!(converted_arrow_schema, arrow_schema); } } From 6136357e9cf62a930d1d60df55e937dbb7ad2ed9 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Fri, 19 Apr 2024 16:05:36 +0800 Subject: [PATCH 2/2] avoid realloc hashmap --- crates/iceberg/src/arrow/schema.rs | 35 +++++++++++++++++++----------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index d7c628cca..c7e870096 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -423,11 +423,14 @@ impl SchemaVisitor for ToArrowSchemaConverter { ArrowSchemaOrFieldOrType::Type(ty) => ty, _ => unreachable!(), }; - let mut metadata = HashMap::new(); - metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string()); - if let Some(doc) = &field.doc { - metadata.insert(ARROW_FIELD_DOC_KEY.to_string(), doc.clone()); - } + let metadata = if let Some(doc) = &field.doc { + HashMap::from([ + (PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string()), + (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()), + ]) + } else { + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), field.id.to_string())]) + }; Ok(ArrowSchemaOrFieldOrType::Field( Field::new(field.name.clone(), ty, !field.required).with_metadata(metadata), )) @@ -457,14 +460,20 @@ impl SchemaVisitor for ToArrowSchemaConverter { ArrowSchemaOrFieldOrType::Field(field) => field, _ => unreachable!(), }; - let mut meta = HashMap::new(); - meta.insert( - PARQUET_FIELD_ID_META_KEY.to_string(), - list.element_field.id.to_string(), - ); - if let Some(doc) = &list.element_field.doc { - meta.insert(ARROW_FIELD_DOC_KEY.to_string(), doc.clone()); - } + let meta = if let Some(doc) = &list.element_field.doc { + HashMap::from([ + ( + PARQUET_FIELD_ID_META_KEY.to_string(), + list.element_field.id.to_string(), + ), + (ARROW_FIELD_DOC_KEY.to_string(), doc.clone()), + ]) + } else { + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + list.element_field.id.to_string(), + )]) + }; let field = field.with_metadata(meta); Ok(ArrowSchemaOrFieldOrType::Type(DataType::List(Arc::new( field,