From 1cc23047a6c046e5743913790fdc9985cba7ebb9 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Mon, 26 Aug 2024 20:42:33 +0200 Subject: [PATCH] WIP --- crates/iceberg/src/error.rs | 6 + crates/iceberg/src/spec/mod.rs | 1 + crates/iceberg/src/spec/schema.rs | 13 + crates/iceberg/src/spec/table_metadata.rs | 7 + .../src/spec/table_metadata_builder.rs | 338 ++++++++++++++++++ 5 files changed, 365 insertions(+) create mode 100644 crates/iceberg/src/spec/table_metadata_builder.rs diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 2b69b4706..a7b03dcb7 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -44,6 +44,11 @@ pub enum ErrorKind { /// /// This error is returned when given iceberg feature is not supported. FeatureUnsupported, + /// Validation failed. + /// + /// This error is returned when Table or View Metadata is manipulated + /// in non-allowed ways. + ValidationFailed, } impl ErrorKind { @@ -59,6 +64,7 @@ impl From for &'static str { ErrorKind::Unexpected => "Unexpected", ErrorKind::DataInvalid => "DataInvalid", ErrorKind::FeatureUnsupported => "FeatureUnsupported", + ErrorKind::ValidationFailed => "ValidationFailed", } } } diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index 793f00d34..9b91d5443 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -25,6 +25,7 @@ mod schema; mod snapshot; mod sort; mod table_metadata; +mod table_metadata_builder; mod transform; mod values; mod view_metadata; diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 106bfb1d8..b903ec698 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -347,6 +347,19 @@ impl Schema { pub fn accessor_by_field_id(&self, field_id: i32) -> Option> { self.field_id_to_accessor.get(&field_id).cloned() } + + /// Check if this schema is identical to another schema semantically - excluding schema id. + pub fn is_same_schema(&self, other: &SchemaRef) -> bool { + self.as_struct().eq(other.as_struct()) + && self.identifier_field_ids().eq(other.identifier_field_ids()) + } + + /// Change the schema id of this schema. + // This is redundant with the `with_schema_id` method on the builder, but useful + // as it is infallible in contrast to the builder `build()` method. + pub(crate) fn with_schema_id(self, schema_id: SchemaId) -> Self { + Self { schema_id, ..self } + } } impl Display for Schema { diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index dacd5bcd7..f40b10c52 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -117,6 +117,12 @@ pub struct TableMetadata { } impl TableMetadata { + /// Convert this Table Metadata into a builder for modification + #[must_use] + pub fn into_builder(self) -> super::table_metadata_builder::TableMetadataBuilder { + super::table_metadata_builder::TableMetadataBuilder::new_from_metadata(self) + } + /// Returns format version of this metadata. #[inline] pub fn format_version(&self) -> FormatVersion { @@ -258,6 +264,7 @@ impl TableMetadata { /// Append snapshot to table pub fn append_snapshot(&mut self, snapshot: Snapshot) { + // ToDo: Fix - support more branches? Required if we have the new builder? self.last_updated_ms = snapshot.timestamp().timestamp_millis(); self.last_sequence_number = snapshot.sequence_number(); diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs new file mode 100644 index 000000000..998cddea6 --- /dev/null +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -0,0 +1,338 @@ +use std::collections::HashMap; + +use uuid::Uuid; + +use super::{FormatVersion, Schema, Snapshot, TableMetadata}; +use crate::error::{Error, ErrorKind, Result}; +use crate::TableUpdate; + +/// Manipulating table metadata. +/// +/// For this builder the order of called functions matters. Functions are applied in-order. +/// All operations applied to the `TableMetadata` are tracked in `changes` as a chronologically +/// ordered vec of `TableUpdate`. +/// If an operation does not lead to a change of the `TableMetadata`, the corresponding update +/// is omitted from `changes`. +#[derive(Debug)] +pub struct TableMetadataBuilder { + metadata: TableMetadata, + changes: Vec, + last_added_schema_id: Option, + last_added_spec_id: Option, + last_added_order_id: Option, +} + +impl TableMetadataBuilder { + const PROPERTY_FORMAT_VERSION: &'static str = "format-version"; + + /// Creates a new table metadata builder from the given table metadata. + #[must_use] + pub fn new_from_metadata(origin: TableMetadata) -> Self { + Self { + metadata: origin, + changes: Vec::default(), + last_added_schema_id: None, + last_added_spec_id: None, + last_added_order_id: None, + } + } + + /// Changes uuid of table metadata. + pub fn assign_uuid(&mut self, uuid: Uuid) -> &mut Self { + if self.metadata.table_uuid != uuid { + self.metadata.table_uuid = uuid; + self.changes.push(TableUpdate::AssignUuid { uuid }); + } + + self + } + + /// Upgrade `FormatVersion`. Downgrades are not allowed. + /// + /// # Errors + /// - Cannot downgrade to older format versions. + pub fn upgrade_format_version(&mut self, format_version: FormatVersion) -> Result<&mut Self> { + if format_version < self.metadata.format_version { + return Err(Error::new( + ErrorKind::ValidationFailed, + format!( + "Cannot downgrade FormatVersion from {} to {}", + self.metadata.format_version, format_version + ), + )); + } + + if format_version != self.metadata.format_version { + self.metadata.format_version = format_version; + self.changes + .push(TableUpdate::UpgradeFormatVersion { format_version }); + } + + Ok(self) + } + + /// Set properties. If a property already exists, it will be overwritten. + /// + /// If a reserved property is set, the corresponding action is performed and the property is not persisted. + /// Currently the following reserved properties are supported: + /// * format-version: Set the format version of the table. + /// + /// # Errors + /// - If format-version property is set to a lower version than the current format version. + pub fn set_properties(&mut self, properties: HashMap) -> Result<&mut Self> { + if properties.is_empty() { + return Ok(self); + } + + // Handle reserved properties. We mut properties to remove reserved properties after + // they are handled. According to spec: + // Reserved table properties are only used to control behaviors when creating or updating a table. + // The value of these properties are not persisted as a part of the table metadata. + let mut properties = properties; + properties + .remove(Self::PROPERTY_FORMAT_VERSION) + .map_or(Ok(()), |format_version| { + self.handle_set_format_version_property(&format_version) + })?; + let properties = properties; // Make immutable for our sanity + + self.metadata.properties.extend(properties.clone()); + self.changes.push(TableUpdate::SetProperties { + updates: properties, + }); + + Ok(self) + } + + /// Remove properties from the table metadata. + /// Does nothing if the key is not present. + pub fn remove_properties(&mut self, properties: &[String]) -> &mut Self { + for property in properties { + self.metadata.properties.remove(property); + } + + if !properties.is_empty() { + self.changes.push(TableUpdate::RemoveProperties { + removals: properties.to_vec(), + }); + } + + self + } + + /// Set the location of the table metadata. + pub fn set_location(&mut self, location: String) -> &mut Self { + if self.metadata.location != location { + self.changes.push(TableUpdate::SetLocation { + location: location.clone(), + }); + self.metadata.location = location; + } + + self + } + + /// Add a snapshot to the table metadata. + /// + /// # Errors + /// - No schema has been added to the table metadata. + /// - No partition spec has been added to the table metadata. + /// - No sort order has been added to the table metadata. + /// - Snapshot id already exists. + /// - For format version > 1: the sequence number of the snapshot is loser than the highest sequence number specified so far. + pub fn add_snapshot(&mut self, snapshot: Snapshot) -> Result<&mut Self> { + if self.metadata.schemas.is_empty() { + return Err(Error::new( + ErrorKind::ValidationFailed, + "Attempting to add a snapshot before a schema is added", + )); + } + + if self.metadata.partition_specs.is_empty() { + return Err(Error::new( + ErrorKind::ValidationFailed, + "Attempting to add a snapshot before a partition spec is added", + )); + } + + if self.metadata.sort_orders.is_empty() { + return Err(Error::new( + ErrorKind::ValidationFailed, + "Attempting to add a snapshot before a sort order is added", + )); + } + + if self + .metadata + .snapshots + .contains_key(&snapshot.snapshot_id()) + { + return Err(Error::new( + ErrorKind::ValidationFailed, + format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()), + )); + } + + if self.metadata.format_version != FormatVersion::V1 + && snapshot.sequence_number() <= self.metadata.last_sequence_number + && snapshot.parent_snapshot_id().is_some() + { + return Err(Error::new( + ErrorKind::ValidationFailed, + format!( + "Cannot add snapshot with sequence number {} older than last sequence number {}", + snapshot.sequence_number(), + self.metadata.last_sequence_number + ) + )); + } + + // Mutation happens in next line - must be infallible from here + self.changes.push(TableUpdate::AddSnapshot { + snapshot: snapshot.clone(), + }); + + self.metadata.last_updated_ms = snapshot.timestamp().timestamp_millis(); + self.metadata.last_sequence_number = snapshot.sequence_number(); + self.metadata + .snapshots + .insert(snapshot.snapshot_id(), snapshot.into()); + + Ok(self) + } + + /// Remove snapshots by its ids from the table metadata. + /// Does nothing if a snapshot id is not present. + /// Keeps as changes only the snapshots that were actually removed. + pub fn remove_snapshots(&mut self, snapshot_ids: &[i64]) -> &mut Self { + let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len()); + + self.metadata.snapshots.retain(|k, _| { + if snapshot_ids.contains(k) { + removed_snapshots.push(*k); + false + } else { + true + } + }); + + if !removed_snapshots.is_empty() { + self.changes.push(TableUpdate::RemoveSnapshots { + snapshot_ids: removed_snapshots, + }); + } + + // Remove refs that are no longer valid + self.metadata + .refs + .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id)); + + self + } + + /// Add a schema to the table metadata. + // ToDo Discuss: Should we add `new_last_column_id` argument? + // TLDR; I believe not as it acts as an assertion and its purpose (and source) is not clear. + // + // Schemas can contain only old columns or a mix of old and new columns. + // In Java, if `new_last_column_id` set but too low, the function would fail, basically hinting at + // at the schema having been built for an older metadata version. `new_last_column_id` is typically obtained + // in the schema building process. + // + // This assertion is not required if the user controls the flow - he knows for which + // metadata he created a schema. If asserting the `new_last_column_id` was semantically important, it should be part of the schema and + // not be passed around alongside it. + // + // Specifying `new_last_column_id` in java also allows to set `metadata.last_column_id` to any arbitrary value + // even if its not present as a column. I believe this to be undesired behavior. This is not possible with the current Rust interface. + // + // If the schema is built out of sync with the TableMetadata, for example in a REST Catalog setting, the assertion of + // the provided `last_column_id` as part of the `TableUpdate::AddSchema` is still done in its `.apply` method. + pub fn add_schema(&mut self, schema: Schema) -> Result<&mut Self> { + // fn returns a result because I think we should check field-id <-> type compatibility if the field-id + // is still present in the metadata. This is not done in the Java code. + let new_schema_id = self.reuse_or_create_new_schema_id(&schema); + let schema_found = self.metadata.schemas.contains_key(&new_schema_id); + + if schema_found { + // ToDo Discuss: The Java code for the next bit is confusing and I think it might be wrong for edge cases. + // Why is it wrong: The baseline is, that if something changes the state of the builder, it has an effect on it and + // must be recorded in the changes. + // The Java code might or might not change `lastAddedSchemaId`, and does not record this change in `changes`. + // Thus, replaying the changes, would lead to a different result if a schema is added twice in unfavorable + // conditions. + // We do it differently, but a check from a Java maintainer would be nice. + + if self.last_added_schema_id != Some(new_schema_id) { + self.changes.push(TableUpdate::AddSchema { + last_column_id: Some(self.metadata.last_column_id), + schema: schema.clone(), + }); + self.last_added_schema_id = Some(new_schema_id); + } + + return Ok(self); + } + + // New schemas might contain only old columns. In this case last_column_id should not be + // reduced. + self.metadata.last_column_id = + std::cmp::max(self.metadata.last_column_id, schema.highest_field_id()); + + // ToDo Discuss: Should we check type compatibility if existing field-id is re-used? + + // Set schema-id + let schema = match new_schema_id == schema.schema_id() { + true => schema, + false => schema.with_schema_id(new_schema_id.into()), + }; + + self.metadata + .schemas + .insert(new_schema_id, schema.clone().into()); + + self.changes.push(TableUpdate::AddSchema { + schema, + last_column_id: Some(self.metadata.last_column_id), + }); + + self.last_added_schema_id = Some(new_schema_id); + + Ok(self) + } + + fn handle_set_format_version_property(&mut self, format_version: &str) -> Result<()> { + // format_version is either "1" or "2" and should not be persisted in the properties. + let format_version = + serde_json::from_str::(&format_version).map_err(|_e| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid value for format-version property: {}", + format_version + ), + ) + })?; + + self.upgrade_format_version(format_version)?; + + Ok(()) + } + + fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 { + self.metadata + .schemas + .iter() + .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id)) + .unwrap_or_else(|| self.get_highest_schema_id() + 1) + } + + fn get_highest_schema_id(&self) -> i32 { + *self + .metadata + .schemas + .keys() + .max() + .unwrap_or(&self.metadata.current_schema_id) + } +}