From 9562d22a2cc550ecaf04cb857b0ad15e67c94603 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Mon, 28 Oct 2024 16:53:54 -0700 Subject: [PATCH] Traits -> Enum --- crates/catalog/memory/src/catalog.rs | 4 +- crates/catalog/sql/src/catalog.rs | 7 +- .../src/expr/visitors/expression_evaluator.rs | 13 +- .../visitors/inclusive_metrics_evaluator.rs | 6 +- .../src/expr/visitors/inclusive_projection.rs | 16 +- crates/iceberg/src/spec/manifest.rs | 20 +- crates/iceberg/src/spec/partition.rs | 351 ++++++++---------- crates/iceberg/src/spec/table_metadata.rs | 34 +- .../writer/file_writer/location_generator.rs | 4 +- 9 files changed, 202 insertions(+), 253 deletions(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index eebce36ff..e4192aae0 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -283,7 +283,7 @@ mod tests { use std::iter::FromIterator; use iceberg::io::FileIOBuilder; - use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; + use iceberg::spec::{BoundPartitionSpec, NestedField, PrimitiveType, Schema, SortOrder, Type}; use regex::Regex; use tempfile::TempDir; @@ -355,7 +355,7 @@ mod tests { assert_eq!(metadata.current_schema().as_ref(), expected_schema); - let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone()) + let expected_partition_spec = BoundPartitionSpec::builder((*expected_schema).clone()) .with_spec_id(0) .build() .unwrap(); diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index b7976d9d5..abf22ffde 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -781,7 +781,7 @@ mod tests { use std::hash::Hash; use iceberg::io::FileIOBuilder; - use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; + use iceberg::spec::{BoundPartitionSpec, NestedField, PrimitiveType, Schema, SortOrder, Type}; use iceberg::table::Table; use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent}; use itertools::Itertools; @@ -874,10 +874,11 @@ mod tests { assert_eq!(metadata.current_schema().as_ref(), expected_schema); - let expected_partition_spec = PartitionSpec::builder(expected_schema) + let expected_partition_spec = BoundPartitionSpec::builder(expected_schema.clone()) .with_spec_id(0) .build() - .unwrap(); + .unwrap() + .into_schemaless(); assert_eq!( metadata diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs b/crates/iceberg/src/expr/visitors/expression_evaluator.rs index 94a003ef6..2add5761f 100644 --- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs @@ -258,12 +258,13 @@ mod tests { UnaryExpression, }; use crate::spec::{ - DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionSpec, - PartitionSpecRef, PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField, + BoundPartitionSpec, BoundPartitionSpecRef, DataContentType, DataFile, DataFileFormat, + Datum, Literal, NestedField, PrimitiveType, Schema, Struct, Transform, Type, + UnboundPartitionField, }; use crate::Result; - fn create_partition_spec(r#type: PrimitiveType) -> Result { + fn create_partition_spec(r#type: PrimitiveType) -> Result { let schema = Schema::builder() .with_fields(vec![Arc::new(NestedField::optional( 1, @@ -272,7 +273,7 @@ mod tests { ))]) .build()?; - let spec = PartitionSpec::builder(schema.clone()) + let spec = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![UnboundPartitionField::builder() .source_id(1) @@ -288,7 +289,7 @@ mod tests { } fn create_partition_filter( - partition_spec: PartitionSpecRef, + partition_spec: BoundPartitionSpecRef, predicate: &BoundPredicate, case_sensitive: bool, ) -> Result { @@ -312,7 +313,7 @@ mod tests { } fn create_expression_evaluator( - partition_spec: PartitionSpecRef, + partition_spec: BoundPartitionSpecRef, predicate: &BoundPredicate, case_sensitive: bool, ) -> Result { diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs index 89bbd6f56..1cdc75771 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs @@ -495,7 +495,7 @@ mod test { UnaryExpression, }; use crate::spec::{ - DataContentType, DataFile, DataFileFormat, Datum, NestedField, PartitionSpec, + BoundPartitionSpec, DataContentType, DataFile, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField, }; @@ -1645,7 +1645,7 @@ mod test { assert!(result, "Should read: NotIn on no nulls column"); } - fn create_test_partition_spec() -> Arc { + fn create_test_partition_spec() -> Arc { let table_schema = Schema::builder() .with_fields(vec![Arc::new(NestedField::optional( 1, @@ -1656,7 +1656,7 @@ mod test { .unwrap(); let table_schema_ref = Arc::new(table_schema); - let partition_spec = PartitionSpec::builder(table_schema_ref.clone()) + let partition_spec = BoundPartitionSpec::builder(table_schema_ref.clone()) .with_spec_id(1) .add_unbound_fields(vec![UnboundPartitionField::builder() .source_id(1) diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs b/crates/iceberg/src/expr/visitors/inclusive_projection.rs index 7594e6458..7c6e0b2d5 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_projection.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs @@ -235,7 +235,7 @@ mod tests { use crate::expr::visitors::inclusive_projection::InclusiveProjection; use crate::expr::{Bind, Predicate, Reference}; use crate::spec::{ - Datum, NestedField, PartitionSpec, PrimitiveType, Schema, Transform, Type, + BoundPartitionSpec, Datum, NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionField, }; @@ -267,7 +267,7 @@ mod tests { let schema = build_test_schema(); let arc_schema = Arc::new(schema); - let partition_spec = PartitionSpec::builder(arc_schema.clone()) + let partition_spec = BoundPartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .build() .unwrap() @@ -298,7 +298,7 @@ mod tests { let schema = build_test_schema(); let arc_schema = Arc::new(schema); - let partition_spec = PartitionSpec::builder(arc_schema.clone()) + let partition_spec = BoundPartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_field( UnboundPartitionField::builder() @@ -336,7 +336,7 @@ mod tests { let schema = build_test_schema(); let arc_schema = Arc::new(schema); - let partition_spec = PartitionSpec::builder(arc_schema.clone()) + let partition_spec = BoundPartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![UnboundPartitionField { source_id: 2, @@ -372,7 +372,7 @@ mod tests { let schema = build_test_schema(); let arc_schema = Arc::new(schema); - let partition_spec = PartitionSpec::builder(arc_schema.clone()) + let partition_spec = BoundPartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![UnboundPartitionField { source_id: 2, @@ -408,7 +408,7 @@ mod tests { let schema = build_test_schema(); let arc_schema = Arc::new(schema); - let partition_spec = PartitionSpec::builder(arc_schema.clone()) + let partition_spec = BoundPartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![UnboundPartitionField { source_id: 2, @@ -444,7 +444,7 @@ mod tests { let schema = build_test_schema(); let arc_schema = Arc::new(schema); - let partition_spec = PartitionSpec::builder(arc_schema.clone()) + let partition_spec = BoundPartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_field( UnboundPartitionField::builder() @@ -485,7 +485,7 @@ mod tests { let schema = build_test_schema(); let arc_schema = Arc::new(schema); - let partition_spec = PartitionSpec::builder(arc_schema.clone()) + let partition_spec = BoundPartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_field( UnboundPartitionField::builder() diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 12b177fad..085200b7c 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -30,8 +30,8 @@ use typed_builder::TypedBuilder; use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; use super::{ - Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile, PartitionSpec, Schema, - SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER, + BoundPartitionSpec, Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile, + Schema, SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER, }; use crate::error::Result; use crate::io::OutputFile; @@ -706,7 +706,7 @@ pub struct ManifestMetadata { /// ID of the schema used to write the manifest as a string schema_id: SchemaId, /// The partition spec used to write the manifest - partition_spec: PartitionSpec, + partition_spec: BoundPartitionSpec, /// Table format version number of the manifest as a string format_version: FormatVersion, /// Type of content files tracked by the manifest: “data” or “deletes” @@ -773,7 +773,7 @@ impl ManifestMetadata { }) .transpose()? .unwrap_or(0); - PartitionSpec::builder(schema.clone()) + BoundPartitionSpec::builder(schema.clone()) .with_spec_id(spec_id) .add_unbound_fields(fields.into_iter().map(|f| f.into_unbound()))? .build()? @@ -1594,7 +1594,7 @@ mod tests { metadata: ManifestMetadata { schema_id: 0, schema: schema.clone(), - partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), + partition_spec: BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V2, }, @@ -1707,7 +1707,7 @@ mod tests { metadata: ManifestMetadata { schema_id: 0, schema: schema.clone(), - partition_spec: PartitionSpec::builder(schema) + partition_spec: BoundPartitionSpec::builder(schema) .with_spec_id(0).add_partition_field("v_int", "v_int", Transform::Identity).unwrap() .add_partition_field("v_long", "v_long", Transform::Identity).unwrap().build().unwrap(), content: ManifestContentType::Data, @@ -1818,7 +1818,7 @@ mod tests { metadata: ManifestMetadata { schema_id: 1, schema: schema.clone(), - partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), + partition_spec: BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V1, }, @@ -1882,7 +1882,7 @@ mod tests { metadata: ManifestMetadata { schema_id: 0, schema: schema.clone(), - partition_spec: PartitionSpec::builder(schema).add_partition_field("category", "category", Transform::Identity).unwrap().build().unwrap(), + partition_spec: BoundPartitionSpec::builder(schema).add_partition_field("category", "category", Transform::Identity).unwrap().build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V1, }, @@ -1961,7 +1961,7 @@ mod tests { metadata: ManifestMetadata { schema_id: 0, schema: schema.clone(), - partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), + partition_spec: BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V2, }, @@ -2033,7 +2033,7 @@ mod tests { metadata: ManifestMetadata { schema_id: 0, schema: schema.clone(), - partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), + partition_spec: BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V2, }, diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 72e398768..491155949 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -30,8 +30,9 @@ use crate::{Error, ErrorKind, Result}; pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0; -/// Reference to [`PartitionSpec`]. -pub type PartitionSpecRef = Arc; +/// Reference to [`BoundPartitionSpec`]. +pub type BoundPartitionSpecRef = Arc; + /// Partition fields capture the transform from table data to partition values. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)] #[serde(rename_all = "kebab-case")] @@ -54,10 +55,23 @@ impl PartitionField { } } +#[derive(Debug, PartialEq, Eq, Clone)] +/// Common interface for partition specs. +/// This enum exposes common functions between [`BoundPartitionSpec`], [`UnboundPartitionSpec`] and [`SchemalessPartitionSpec`]. +/// In most cases, it is better to use the specific struct instead of this enum. +pub enum PartitionSpec<'a> { + /// Bound partition spec + Bound(&'a BoundPartitionSpec), + /// Unbound partition spec + Unbound(&'a UnboundPartitionSpec), + /// Schemaless partition spec + Schemaless(&'a SchemalessPartitionSpec), +} + /// Partition spec that defines how to produce a tuple of partition values from a record. /// `PartitionSpec` is bound to a specific schema. #[derive(Debug, PartialEq, Eq, Clone)] -pub struct PartitionSpec { +pub struct BoundPartitionSpec { /// Identifier for PartitionSpec spec_id: i32, /// Details of the partition spec @@ -83,7 +97,7 @@ pub struct SchemalessPartitionSpec { fields: Vec, } -impl PartitionSpec { +impl BoundPartitionSpec { /// Create partition spec builder pub fn builder(schema: impl Into) -> PartitionSpecBuilder { PartitionSpecBuilder::new(schema) @@ -123,14 +137,16 @@ impl PartitionSpec { /// /// A [`PartitionSpec`] is unpartitioned if it has no fields or all fields are [`Transform::Void`] transform. pub fn is_unpartitioned(&self) -> bool { - >::is_unpartitioned(self) + // >::is_unpartitioned(self) + todo!() } /// Turn this partition spec into an unbound partition spec. /// /// The `field_id` is retained as `partition_id` in the unbound partition spec. pub fn into_unbound(self) -> UnboundPartitionSpec { - >::into_unbound(self) + // >::into_unbound(self) + todo!() } /// Turn this partition spec into a preserved partition spec. @@ -147,24 +163,24 @@ impl PartitionSpec { /// * Field names /// * Source column ids /// * Transforms - pub fn is_compatible_with>( - &self, - other: &T, - ) -> bool { - >::is_compatible_with(self, other) + pub fn is_compatible_with<'a, T: Into>>(&self, other: T) -> bool { + let self_spec = PartitionSpec::Bound(self); + self_spec.is_compatible_with(other.into()) } /// Check if this partition spec has sequential partition ids. /// Sequential ids start from 1000 and increment by 1 for each field. /// This is required for spec version 1 pub fn has_sequential_ids(&self) -> bool { - ::has_sequential_ids(self) + // ::has_sequential_ids(self) + todo!() } /// Get the highest field id in the partition spec. /// If the partition spec is unpartitioned, it returns the last unpartitioned last assigned id (999). pub fn highest_field_id(&self) -> i32 { - ::highest_field_id(self) + // ::highest_field_id(self) + todo!() } /// Returns the partition type of this partition spec. @@ -185,7 +201,7 @@ impl SchemalessPartitionSpec { } /// Bind this schemaless partition spec to a schema. - pub fn bind(self, schema: impl Into) -> Result { + pub fn bind(self, schema: impl Into) -> Result { PartitionSpecBuilder::new_from_unbound(self.into_unbound(), schema)?.build() } @@ -201,6 +217,11 @@ impl SchemalessPartitionSpec { pub fn partition_type(&self, schema: &Schema) -> Result { PartitionSpecBuilder::partition_type(&self.fields, schema) } + + /// Convert to unbound partition spec + pub fn into_unbound(self) -> UnboundPartitionSpec { + self.into() + } } /// Reference to [`UnboundPartitionSpec`]. @@ -238,7 +259,7 @@ impl UnboundPartitionSpec { } /// Bind this unbound partition spec to a schema. - pub fn bind(self, schema: impl Into) -> Result { + pub fn bind(self, schema: impl Into) -> Result { PartitionSpecBuilder::new_from_unbound(self, schema)?.build() } @@ -261,23 +282,54 @@ impl UnboundPartitionSpec { } } -/// Trait for common functions between [`PartitionSpec`], [`UnboundPartitionSpec`] and [`PreservedPartitionSpec`] -pub trait UnboundPartitionSpecInterface { - /// Fields of the partition spec - fn fields(&self) -> &[T]; - - /// Turn this partition spec into an unbound partition spec. - fn into_unbound(self) -> UnboundPartitionSpec; - +impl<'a> PartitionSpec<'a> { /// Returns if the partition spec is unpartitioned. /// /// A spec is unpartitioned if it has no fields or all fields are [`Transform::Void`] transform. - fn is_unpartitioned(&self) -> bool { - self.fields().is_empty() - || self - .fields() + pub fn is_unpartitioned(&self) -> bool { + let transforms: Vec<_> = match self { + PartitionSpec::Bound(spec) => spec.fields.iter().map(|f| f.transform).collect(), + PartitionSpec::Unbound(spec) => spec.fields.iter().map(|f| f.transform).collect(), + PartitionSpec::Schemaless(spec) => spec.fields.iter().map(|f| f.transform).collect(), + }; + + transforms.is_empty() || transforms.iter().all(|t| *t == Transform::Void) + } + + /// Number of fields in the partition spec + pub fn len(&self) -> usize { + match self { + PartitionSpec::Bound(spec) => spec.fields.len(), + PartitionSpec::Unbound(spec) => spec.fields.len(), + PartitionSpec::Schemaless(spec) => spec.fields.len(), + } + } + + /// Check if the partition spec is empty + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Internal function to get common properties of fields. + /// Returns (source_id, name, transform) + fn fields(&self) -> Vec<(i32, &str, &Transform)> { + match self { + PartitionSpec::Bound(spec) => spec + .fields + .iter() + .map(|f| (f.source_id, f.name.as_str(), &f.transform)) + .collect(), + PartitionSpec::Unbound(spec) => spec + .fields .iter() - .all(|f| matches!(f.transform(), Transform::Void)) + .map(|f| (f.source_id, f.name.as_str(), &f.transform)) + .collect(), + PartitionSpec::Schemaless(spec) => spec + .fields + .iter() + .map(|f| (f.source_id, f.name.as_str(), &f.transform)) + .collect(), + } } /// Check if this partition spec is compatible with another partition spec. @@ -289,149 +341,25 @@ pub trait UnboundPartitionSpecInterface { /// * Field names /// * Source column ids /// * Transforms - fn is_compatible_with>( - &self, - other: &O, - ) -> bool { - if self.fields().len() != other.fields().len() { + pub fn is_compatible_with(&self, other: PartitionSpec<'_>) -> bool { + if self.len() != other.len() { return false; } - for (this_field, other_field) in self.fields().iter().zip(other.fields()) { - if this_field.source_id() != other_field.source_id() - || this_field.transform() != other_field.transform() - || this_field.name() != other_field.name() - { - return false; - } - } - - true - } -} - -/// Trait for common functions between [`PartitionSpec`] and [`PreservedPartitionSpec`] -pub trait PartitionSpecInterface: UnboundPartitionSpecInterface { - /// Spec id of the partition spec - fn spec_id(&self) -> i32; + let self_fields = self.fields(); + let other_fields = other.fields(); - /// Check if this partition spec has sequential partition ids. - /// Sequential ids start from 1000 and increment by 1 for each field. - /// This is required for spec version 1 - fn has_sequential_ids(&self) -> bool { - for (index, field) in self.fields().iter().enumerate() { - let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64) - .checked_add(1) - .and_then(|id| id.checked_add(index as i64)) - .unwrap_or(i64::MAX); - - if field.field_id as i64 != expected_id { + for (this_field, other_field) in self_fields.iter().zip(other_fields) { + if this_field.0 != other_field.0 + || this_field.1 != other_field.1 + || this_field.2 != other_field.2 + { return false; } } true } - - /// Get the highest field id in the partition spec. - /// If the partition spec is unpartitioned, it returns the last unpartitioned last assigned id (999). - fn highest_field_id(&self) -> i32 { - self.fields() - .iter() - .map(|f| f.field_id) - .max() - .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID) - } -} - -impl PartitionSpecInterface for PartitionSpec { - fn spec_id(&self) -> i32 { - self.spec_id - } -} - -impl PartitionSpecInterface for SchemalessPartitionSpec { - fn spec_id(&self) -> i32 { - self.spec_id - } -} - -impl UnboundPartitionSpecInterface for PartitionSpec { - fn fields(&self) -> &[PartitionField] { - &self.fields - } - - fn into_unbound(self) -> UnboundPartitionSpec { - self.into() - } -} - -impl UnboundPartitionSpecInterface for UnboundPartitionSpec { - fn fields(&self) -> &[UnboundPartitionField] { - &self.fields - } - - fn into_unbound(self) -> UnboundPartitionSpec { - self - } -} - -impl UnboundPartitionSpecInterface for SchemalessPartitionSpec { - fn fields(&self) -> &[PartitionField] { - &self.fields - } - - fn into_unbound(self) -> UnboundPartitionSpec { - self.into() - } -} - -/// Trait for common functions between [`PartitionField`] and [`UnboundPartitionField`] -pub trait PartitionFieldInterface { - /// A source column id from the table’s schema - fn source_id(&self) -> i32; - /// A partition name. - fn name(&self) -> &str; - /// A transform that is applied to the source column to produce a partition value. - fn transform(&self) -> &Transform; - /// Convert to unbound partition field - fn into_unbound(self) -> UnboundPartitionField; -} - -impl PartitionFieldInterface for PartitionField { - fn source_id(&self) -> i32 { - self.source_id - } - - fn name(&self) -> &str { - &self.name - } - - fn transform(&self) -> &Transform { - &self.transform - } - - fn into_unbound(self) -> UnboundPartitionField { - self.into() - } -} - -impl PartitionFieldInterface for UnboundPartitionField { - fn source_id(&self) -> i32 { - self.source_id - } - - fn name(&self) -> &str { - &self.name - } - - fn transform(&self) -> &Transform { - &self.transform - } - - fn into_unbound(self) -> UnboundPartitionField { - self - } } impl From for UnboundPartitionField { @@ -445,8 +373,8 @@ impl From for UnboundPartitionField { } } -impl From for UnboundPartitionSpec { - fn from(spec: PartitionSpec) -> Self { +impl From for UnboundPartitionSpec { + fn from(spec: BoundPartitionSpec) -> Self { UnboundPartitionSpec { spec_id: Some(spec.spec_id), fields: spec.fields.into_iter().map(Into::into).collect(), @@ -463,8 +391,8 @@ impl From for UnboundPartitionSpec { } } -impl From for SchemalessPartitionSpec { - fn from(spec: PartitionSpec) -> Self { +impl From for SchemalessPartitionSpec { + fn from(spec: BoundPartitionSpec) -> Self { SchemalessPartitionSpec { spec_id: spec.spec_id, fields: spec.fields, @@ -472,6 +400,24 @@ impl From for SchemalessPartitionSpec { } } +impl<'a> From<&'a BoundPartitionSpec> for PartitionSpec<'a> { + fn from(spec: &'a BoundPartitionSpec) -> Self { + PartitionSpec::Bound(spec) + } +} + +impl<'a> From<&'a SchemalessPartitionSpec> for PartitionSpec<'a> { + fn from(spec: &'a SchemalessPartitionSpec) -> Self { + PartitionSpec::Schemaless(spec) + } +} + +impl<'a> From<&'a UnboundPartitionSpec> for PartitionSpec<'a> { + fn from(spec: &'a UnboundPartitionSpec) -> Self { + PartitionSpec::Unbound(spec) + } +} + /// Create a new UnboundPartitionSpec #[derive(Debug, Default)] pub struct UnboundPartitionSpecBuilder { @@ -652,10 +598,10 @@ impl PartitionSpecBuilder { } /// Build a bound partition spec with the given schema. - pub fn build(self) -> Result { + pub fn build(self) -> Result { let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id)?; let partition_type = Self::partition_type(&fields, &self.schema)?; - Ok(PartitionSpec { + Ok(BoundPartitionSpec { spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID), fields, partition_type, @@ -944,7 +890,7 @@ mod tests { ]) .build() .unwrap(); - let partition_spec = PartitionSpec::builder(schema.clone()) + let partition_spec = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(1) .build() .unwrap(); @@ -953,7 +899,7 @@ mod tests { "Empty partition spec should be unpartitioned" ); - let partition_spec = PartitionSpec::builder(schema.clone()) + let partition_spec = BoundPartitionSpec::builder(schema.clone()) .add_unbound_fields(vec![ UnboundPartitionField::builder() .source_id(1) @@ -975,7 +921,7 @@ mod tests { "Partition spec with one non void transform should not be unpartitioned" ); - let partition_spec = PartitionSpec::builder(schema.clone()) + let partition_spec = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![ UnboundPartitionField::builder() @@ -1073,14 +1019,14 @@ mod tests { ]) .build() .unwrap(); - let partition_spec = PartitionSpec::builder(schema.clone()) + let partition_spec = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(0) .build() .unwrap(); let partition_type = partition_spec.partition_type(); assert_eq!(0, partition_type.fields().len()); - let unpartition_spec = PartitionSpec::unpartition_spec(schema); + let unpartition_spec = BoundPartitionSpec::unpartition_spec(schema); assert_eq!(partition_spec, unpartition_spec); } @@ -1285,7 +1231,7 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder(schema.clone()) + let partition_spec = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(99) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1337,7 +1283,7 @@ mod tests { ]) .build() .unwrap(); - PartitionSpec::builder(schema.clone()) + BoundPartitionSpec::builder(schema.clone()) .add_unbound_field(UnboundPartitionField { source_id: 1, field_id: Some(1000), @@ -1375,7 +1321,7 @@ mod tests { ]) .build() .unwrap(); - let spec = PartitionSpec::builder(schema.clone()) + let spec = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1423,19 +1369,19 @@ mod tests { .build() .unwrap(); - PartitionSpec::builder(schema.clone()) + BoundPartitionSpec::builder(schema.clone()) .with_spec_id(1) .build() .unwrap(); - let spec = PartitionSpec::builder(schema.clone()) + let spec = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_partition_field("id", "id_bucket[16]", Transform::Bucket(16)) .unwrap() .build() .unwrap(); - assert_eq!(spec, PartitionSpec { + assert_eq!(spec, BoundPartitionSpec { spec_id: 1, schema: schema.into(), fields: vec![PartitionField { @@ -1465,12 +1411,12 @@ mod tests { .build() .unwrap(); - PartitionSpec::builder(schema.clone()) + BoundPartitionSpec::builder(schema.clone()) .with_spec_id(1) .build() .unwrap(); - let err = PartitionSpec::builder(schema) + let err = BoundPartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1498,12 +1444,12 @@ mod tests { .build() .unwrap(); - PartitionSpec::builder(schema.clone()) + BoundPartitionSpec::builder(schema.clone()) .with_spec_id(1) .build() .unwrap(); - PartitionSpec::builder(schema.clone()) + BoundPartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1516,7 +1462,7 @@ mod tests { .unwrap(); // Not OK for different source id - PartitionSpec::builder(schema) + BoundPartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 2, @@ -1550,7 +1496,7 @@ mod tests { .unwrap(); // Valid - PartitionSpec::builder(schema.clone()) + BoundPartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![ UnboundPartitionField { @@ -1571,7 +1517,7 @@ mod tests { .unwrap(); // Invalid - PartitionSpec::builder(schema) + BoundPartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_fields(vec![ UnboundPartitionField { @@ -1617,7 +1563,7 @@ mod tests { .build() .unwrap(); - PartitionSpec::builder(schema) + BoundPartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1668,7 +1614,7 @@ mod tests { .build() .unwrap(); - let partition_spec_1 = PartitionSpec::builder(schema.clone()) + let partition_spec_1 = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1680,7 +1626,7 @@ mod tests { .build() .unwrap(); - let partition_spec_2 = PartitionSpec::builder(schema) + let partition_spec_2 = BoundPartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1707,7 +1653,7 @@ mod tests { .build() .unwrap(); - let partition_spec_1 = PartitionSpec::builder(schema.clone()) + let partition_spec_1 = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1719,7 +1665,7 @@ mod tests { .build() .unwrap(); - let partition_spec_2 = PartitionSpec::builder(schema) + let partition_spec_2 = BoundPartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1750,7 +1696,7 @@ mod tests { .build() .unwrap(); - let partition_spec_1 = PartitionSpec::builder(schema.clone()) + let partition_spec_1 = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1762,7 +1708,7 @@ mod tests { .build() .unwrap(); - let partition_spec_2 = PartitionSpec::builder(schema) + let partition_spec_2 = BoundPartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 2, @@ -1793,7 +1739,7 @@ mod tests { .build() .unwrap(); - let partition_spec_1 = PartitionSpec::builder(schema.clone()) + let partition_spec_1 = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1812,7 +1758,7 @@ mod tests { .build() .unwrap(); - let partition_spec_2 = PartitionSpec::builder(schema) + let partition_spec_2 = BoundPartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 2, @@ -1836,10 +1782,11 @@ mod tests { #[test] fn test_highest_field_id_unpartitioned() { - let spec = PartitionSpec::builder(Schema::builder().with_fields(vec![]).build().unwrap()) - .with_spec_id(1) - .build() - .unwrap(); + let spec = + BoundPartitionSpec::builder(Schema::builder().with_fields(vec![]).build().unwrap()) + .with_spec_id(1) + .build() + .unwrap(); assert_eq!(UNPARTITIONED_LAST_ASSIGNED_ID, spec.highest_field_id()); } @@ -1860,7 +1807,7 @@ mod tests { .build() .unwrap(); - let spec = PartitionSpec::builder(schema) + let spec = BoundPartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1898,7 +1845,7 @@ mod tests { .build() .unwrap(); - let spec = PartitionSpec::builder(schema) + let spec = BoundPartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1938,7 +1885,7 @@ mod tests { .build() .unwrap(); - let spec = PartitionSpec::builder(schema) + let spec = BoundPartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1978,7 +1925,7 @@ mod tests { .build() .unwrap(); - let spec = PartitionSpec::builder(schema) + let spec = BoundPartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 7707b13b1..4a2e3ab73 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -31,8 +31,8 @@ use uuid::Uuid; use super::snapshot::SnapshotReference; use super::{ - PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, SchemalessPartitionSpecRef, Snapshot, - SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID, + BoundPartitionSpec, BoundPartitionSpecRef, SchemaId, SchemaRef, SchemalessPartitionSpecRef, + Snapshot, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID, }; use crate::error::{timestamp_ms_to_utc, Result}; use crate::{Error, ErrorKind, TableCreation}; @@ -120,7 +120,7 @@ pub struct TableMetadata { /// A list of partition specs, stored as full partition spec objects. pub(crate) partition_specs: HashMap, /// ID of the “current” spec that writers should use by default. - pub(crate) default_spec: PartitionSpecRef, + pub(crate) default_spec: BoundPartitionSpecRef, /// An integer; the highest assigned partition field ID across all partition specs for the table. pub(crate) last_partition_id: i32, ///A string to string map of table properties. This is used to control settings that @@ -234,7 +234,7 @@ impl TableMetadata { /// Get default partition spec #[inline] - pub fn default_partition_spec(&self) -> &PartitionSpecRef { + pub fn default_partition_spec(&self) -> &BoundPartitionSpecRef { &self.default_spec } @@ -560,7 +560,7 @@ impl TableMetadataBuilder { } = table_creation; let schema: Arc = Arc::new(schema); - let unpartition_spec = PartitionSpec::unpartition_spec(schema.clone()); + let unpartition_spec = BoundPartitionSpec::unpartition_spec(schema.clone()); let partition_specs = match partition_spec { Some(_) => { return Err(Error::new( @@ -602,7 +602,7 @@ impl TableMetadataBuilder { current_schema_id: schema.schema_id(), schemas: HashMap::from([(schema.schema_id(), schema)]), partition_specs, - default_spec: PartitionSpecRef::new(unpartition_spec), + default_spec: BoundPartitionSpecRef::new(unpartition_spec), last_partition_id: 0, properties, current_snapshot_id: None, @@ -654,7 +654,7 @@ pub(super) mod _serde { use crate::spec::schema::_serde::{SchemaV1, SchemaV2}; use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2}; use crate::spec::{ - PartitionField, PartitionSpec, Schema, SchemaRef, SchemalessPartitionSpec, Snapshot, + BoundPartitionSpec, PartitionField, Schema, SchemaRef, SchemalessPartitionSpec, Snapshot, SnapshotReference, SnapshotRetention, SortOrder, }; use crate::{Error, ErrorKind}; @@ -830,7 +830,7 @@ pub(super) mod _serde { .transpose()? .or_else(|| { (DEFAULT_PARTITION_SPEC_ID == default_spec_id) - .then(|| PartitionSpec::unpartition_spec(current_schema.clone())) + .then(|| BoundPartitionSpec::unpartition_spec(current_schema.clone())) }) .ok_or_else(|| { Error::new( @@ -945,7 +945,7 @@ pub(super) mod _serde { let partition_specs = match value.partition_specs { Some(partition_specs) => partition_specs, - None => vec![PartitionSpec::builder(current_schema.clone()) + None => vec![BoundPartitionSpec::builder(current_schema.clone()) .with_spec_id(DEFAULT_PARTITION_SPEC_ID) .add_unbound_fields(value.partition_spec.into_iter().map(|f| f.into_unbound()))? .build()? @@ -1245,7 +1245,7 @@ mod tests { use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder}; use crate::spec::table_metadata::TableMetadata; use crate::spec::{ - NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema, Snapshot, + BoundPartitionSpec, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary, Transform, Type, UnboundPartitionField, }; @@ -1350,7 +1350,7 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder(schema.clone()) + let partition_spec = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(0) .add_unbound_field(UnboundPartitionField { name: "ts_day".to_string(), @@ -1514,7 +1514,7 @@ mod tests { .unwrap(); let schema = Arc::new(schema); - let partition_spec = PartitionSpec::builder(schema.clone()) + let partition_spec = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(0) .add_partition_field("vendor_id", "vendor_id", Transform::Identity) .unwrap() @@ -1915,7 +1915,7 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder(schema2.clone()) + let partition_spec = BoundPartitionSpec::builder(schema2.clone()) .with_spec_id(0) .add_unbound_field(UnboundPartitionField { name: "x".to_string(), @@ -2041,7 +2041,7 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder(schema.clone()) + let partition_spec = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(0) .add_unbound_field(UnboundPartitionField { name: "x".to_string(), @@ -2124,7 +2124,7 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder(schema.clone()) + let partition_spec = BoundPartitionSpec::builder(schema.clone()) .with_spec_id(0) .add_unbound_field(UnboundPartitionField { name: "x".to_string(), @@ -2262,7 +2262,7 @@ mod tests { let default_spec_id = 1234; let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json"); let partition_spec = - PartitionSpec::unpartition_spec(table_meta_data.current_schema().clone()); + BoundPartitionSpec::unpartition_spec(table_meta_data.current_schema().clone()); table_meta_data.default_spec = partition_spec.clone().into(); table_meta_data .partition_specs @@ -2327,7 +2327,7 @@ mod tests { HashMap::from([( 0, Arc::new( - PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone()) + BoundPartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone()) .with_spec_id(0) .build() .unwrap() diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs b/crates/iceberg/src/writer/file_writer/location_generator.rs index 1d5dedda1..def18b580 100644 --- a/crates/iceberg/src/writer/file_writer/location_generator.rs +++ b/crates/iceberg/src/writer/file_writer/location_generator.rs @@ -132,7 +132,7 @@ pub(crate) mod test { use uuid::Uuid; use super::LocationGenerator; - use crate::spec::{FormatVersion, PartitionSpec, TableMetadata}; + use crate::spec::{BoundPartitionSpec, FormatVersion, TableMetadata}; use crate::writer::file_writer::location_generator::{ FileNameGenerator, WRITE_DATA_LOCATION, WRITE_FOLDER_STORAGE_LOCATION, }; @@ -166,7 +166,7 @@ pub(crate) mod test { schemas: HashMap::new(), current_schema_id: 1, partition_specs: HashMap::new(), - default_spec: PartitionSpec::unpartition_spec(schema).into(), + default_spec: BoundPartitionSpec::unpartition_spec(schema).into(), last_partition_id: 1000, default_sort_order_id: 0, sort_orders: HashMap::from_iter(vec![]),