From 12e12e274571d7aa75c023f6bcd5164984041881 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 20 Sep 2024 19:59:55 +0800 Subject: [PATCH] feat: expose arrow type <-> iceberg type (#637) * feat: expose arrow type <-> iceberg type Previously we only exposed the schema conversion. Signed-off-by: xxchan * add tests Signed-off-by: xxchan --------- Signed-off-by: xxchan --- crates/iceberg/src/arrow/schema.rs | 108 +++++++++++++++++++++++++++-- 1 file changed, 102 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 2ff43e0f0..08600664e 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -171,7 +171,6 @@ fn visit_type(r#type: &DataType, visitor: &mut V) -> Resu } /// Visit list types in post order. -#[allow(dead_code)] fn visit_list( data_type: &DataType, element_field: &Field, @@ -184,7 +183,6 @@ fn visit_list( } /// Visit struct type in post order. -#[allow(dead_code)] fn visit_struct(fields: &Fields, visitor: &mut V) -> Result { let mut results = Vec::with_capacity(fields.len()); for field in fields { @@ -198,7 +196,6 @@ fn visit_struct(fields: &Fields, visitor: &mut V) -> Resu } /// Visit schema in post order. -#[allow(dead_code)] fn visit_schema(schema: &ArrowSchema, visitor: &mut V) -> Result { let mut results = Vec::with_capacity(schema.fields().len()); for field in schema.fields() { @@ -211,12 +208,17 @@ fn visit_schema(schema: &ArrowSchema, visitor: &mut V) -> } /// Convert Arrow schema to ceberg schema. -#[allow(dead_code)] pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result { let mut visitor = ArrowSchemaConverter::new(); visit_schema(schema, &mut visitor) } +/// Convert Arrow type to iceberg type. +pub fn arrow_type_to_type(ty: &DataType) -> Result { + let mut visitor = ArrowSchemaConverter::new(); + visit_type(ty, &mut visitor) +} + const ARROW_FIELD_DOC_KEY: &str = "doc"; fn get_field_id(field: &Field) -> Result { @@ -246,7 +248,6 @@ fn get_field_doc(field: &Field) -> Option { struct ArrowSchemaConverter; impl ArrowSchemaConverter { - #[allow(dead_code)] fn new() -> Self { Self {} } @@ -615,6 +616,15 @@ pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result crate::Result { + let mut converter = ToArrowSchemaConverter; + match crate::spec::visit_type(ty, &mut converter)? { + ArrowSchemaOrFieldOrType::Type(ty) => Ok(ty), + _ => unreachable!(), + } +} + /// Convert Iceberg Datum to Arrow Datum. pub(crate) fn get_arrow_datum(datum: &Datum) -> Result> { match (datum.data_type(), datum.literal()) { @@ -779,7 +789,7 @@ mod tests { use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit}; use super::*; - use crate::spec::Schema; + use crate::spec::{Literal, Schema}; /// Create a simple field with metadata. fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field { @@ -1365,4 +1375,90 @@ mod tests { let converted_arrow_schema = schema_to_arrow_schema(&schema).unwrap(); assert_eq!(converted_arrow_schema, arrow_schema); } + + #[test] + fn test_type_conversion() { + // test primitive type + { + let arrow_type = DataType::Int32; + let iceberg_type = Type::Primitive(PrimitiveType::Int); + assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap()); + assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap()); + } + + // test struct type + { + // no metadata will cause error + let arrow_type = DataType::Struct(Fields::from(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Utf8, true), + ])); + assert_eq!( + &arrow_type_to_type(&arrow_type).unwrap_err().to_string(), + "DataInvalid => Field id not found in metadata" + ); + + let arrow_type = DataType::Struct(Fields::from(vec![ + Field::new("a", DataType::Int64, false).with_metadata(HashMap::from_iter([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from_iter([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + ])); + let iceberg_type = Type::Struct(StructType::new(vec![ + NestedField { + id: 1, + doc: None, + name: "a".to_string(), + required: true, + field_type: Box::new(Type::Primitive(PrimitiveType::Long)), + initial_default: None, + write_default: None, + } + .into(), + NestedField { + id: 2, + doc: None, + name: "b".to_string(), + required: false, + field_type: Box::new(Type::Primitive(PrimitiveType::String)), + initial_default: None, + write_default: None, + } + .into(), + ])); + assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap()); + assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap()); + + // initial_default and write_default is ignored + let iceberg_type = Type::Struct(StructType::new(vec![ + NestedField { + id: 1, + doc: None, + name: "a".to_string(), + required: true, + field_type: Box::new(Type::Primitive(PrimitiveType::Long)), + initial_default: Some(Literal::Primitive(PrimitiveLiteral::Int(114514))), + write_default: None, + } + .into(), + NestedField { + id: 2, + doc: None, + name: "b".to_string(), + required: false, + field_type: Box::new(Type::Primitive(PrimitiveType::String)), + initial_default: None, + write_default: Some(Literal::Primitive(PrimitiveLiteral::String( + "514".to_string(), + ))), + } + .into(), + ])); + assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap()); + } + } }