Skip to content

Commit

Permalink
feat: expose arrow type <-> iceberg type (apache#637)
Browse files Browse the repository at this point in the history
* feat: expose arrow type <-> iceberg type

Previously we only exposed the schema conversion.

Signed-off-by: xxchan <xxchan22f@gmail.com>

* add tests

Signed-off-by: xxchan <xxchan22f@gmail.com>

---------

Signed-off-by: xxchan <xxchan22f@gmail.com>
  • Loading branch information
xxchan authored Sep 20, 2024
1 parent 3b27c9e commit 12e12e2
Showing 1 changed file with 102 additions and 6 deletions.
108 changes: 102 additions & 6 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> Resu
}

/// Visit list types in post order.
#[allow(dead_code)]
fn visit_list<V: ArrowSchemaVisitor>(
data_type: &DataType,
element_field: &Field,
Expand All @@ -184,7 +183,6 @@ fn visit_list<V: ArrowSchemaVisitor>(
}

/// Visit struct type in post order.
#[allow(dead_code)]
fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> Result<V::T> {
let mut results = Vec::with_capacity(fields.len());
for field in fields {
Expand All @@ -198,7 +196,6 @@ fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> Resu
}

/// Visit schema in post order.
#[allow(dead_code)]
fn visit_schema<V: ArrowSchemaVisitor>(schema: &ArrowSchema, visitor: &mut V) -> Result<V::U> {
let mut results = Vec::with_capacity(schema.fields().len());
for field in schema.fields() {
Expand All @@ -211,12 +208,17 @@ fn visit_schema<V: ArrowSchemaVisitor>(schema: &ArrowSchema, visitor: &mut V) ->
}

/// Convert Arrow schema to ceberg schema.
#[allow(dead_code)]
pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
let mut visitor = ArrowSchemaConverter::new();
visit_schema(schema, &mut visitor)
}

/// Convert Arrow type to iceberg type.
pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {
let mut visitor = ArrowSchemaConverter::new();
visit_type(ty, &mut visitor)
}

const ARROW_FIELD_DOC_KEY: &str = "doc";

fn get_field_id(field: &Field) -> Result<i32> {
Expand Down Expand Up @@ -246,7 +248,6 @@ fn get_field_doc(field: &Field) -> Option<String> {
struct ArrowSchemaConverter;

impl ArrowSchemaConverter {
#[allow(dead_code)]
fn new() -> Self {
Self {}
}
Expand Down Expand Up @@ -615,6 +616,15 @@ pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result<Arr
}
}

/// Convert iceberg type to an arrow type.
pub fn type_to_arrow_type(ty: &crate::spec::Type) -> crate::Result<DataType> {
let mut converter = ToArrowSchemaConverter;
match crate::spec::visit_type(ty, &mut converter)? {
ArrowSchemaOrFieldOrType::Type(ty) => Ok(ty),
_ => unreachable!(),
}
}

/// Convert Iceberg Datum to Arrow Datum.
pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send>> {
match (datum.data_type(), datum.literal()) {
Expand Down Expand Up @@ -779,7 +789,7 @@ mod tests {
use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};

use super::*;
use crate::spec::Schema;
use crate::spec::{Literal, Schema};

/// Create a simple field with metadata.
fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field {
Expand Down Expand Up @@ -1365,4 +1375,90 @@ mod tests {
let converted_arrow_schema = schema_to_arrow_schema(&schema).unwrap();
assert_eq!(converted_arrow_schema, arrow_schema);
}

#[test]
fn test_type_conversion() {
// test primitive type
{
let arrow_type = DataType::Int32;
let iceberg_type = Type::Primitive(PrimitiveType::Int);
assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
}

// test struct type
{
// no metadata will cause error
let arrow_type = DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Utf8, true),
]));
assert_eq!(
&arrow_type_to_type(&arrow_type).unwrap_err().to_string(),
"DataInvalid => Field id not found in metadata"
);

let arrow_type = DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int64, false).with_metadata(HashMap::from_iter([(
PARQUET_FIELD_ID_META_KEY.to_string(),
1.to_string(),
)])),
Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from_iter([(
PARQUET_FIELD_ID_META_KEY.to_string(),
2.to_string(),
)])),
]));
let iceberg_type = Type::Struct(StructType::new(vec![
NestedField {
id: 1,
doc: None,
name: "a".to_string(),
required: true,
field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
initial_default: None,
write_default: None,
}
.into(),
NestedField {
id: 2,
doc: None,
name: "b".to_string(),
required: false,
field_type: Box::new(Type::Primitive(PrimitiveType::String)),
initial_default: None,
write_default: None,
}
.into(),
]));
assert_eq!(iceberg_type, arrow_type_to_type(&arrow_type).unwrap());
assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());

// initial_default and write_default is ignored
let iceberg_type = Type::Struct(StructType::new(vec![
NestedField {
id: 1,
doc: None,
name: "a".to_string(),
required: true,
field_type: Box::new(Type::Primitive(PrimitiveType::Long)),
initial_default: Some(Literal::Primitive(PrimitiveLiteral::Int(114514))),
write_default: None,
}
.into(),
NestedField {
id: 2,
doc: None,
name: "b".to_string(),
required: false,
field_type: Box::new(Type::Primitive(PrimitiveType::String)),
initial_default: None,
write_default: Some(Literal::Primitive(PrimitiveLiteral::String(
"514".to_string(),
))),
}
.into(),
]));
assert_eq!(arrow_type, type_to_arrow_type(&iceberg_type).unwrap());
}
}
}

0 comments on commit 12e12e2

Please # to comment.