From bdc66a0e984fdedaa81b252d35c72b3f3cb79047 Mon Sep 17 00:00:00 2001 From: JanKaul Date: Mon, 21 Aug 2023 10:53:05 +0200 Subject: [PATCH] feat: Table metadata (#29) * serde schemav1 & schemav2 * fix default schema id * implement snapshot * add partition spec * add license * add sortorder * fix initial & write default * serialize/deserialize table metadata * impl table metadata * fix docs * fix clippy warnings * change visibility * fix rebase * fix clippy warnings * fix transform * introduce static * fix typo * change spec export style * improve table metadata v1 test * improve table metadata v2 test * delete temp file * export all submodule types at spec module * rename snapshotreference * rename snapshot retention * remove option from properties * remove option from snapshot log * improve builder * introduce enum for manifest list files and manifests * keep retention * use arc for schema * use arc for snapshot * current snapshot returns option * remove panic from snapshot conversion * check if current_snapshot_id is -1 * fix schema * use schema field as fallback in v1 table metadata * use partition spec as fallback in v1 metadata * fix parition spec * introduce _serde module for schema * introduce _serde module for snapshot * introduce _serde module for table_metadata * fix docs * fix typo * use minimal table metadata for v1 test --- crates/iceberg/Cargo.toml | 2 + crates/iceberg/src/lib.rs | 3 + crates/iceberg/src/spec/datatypes.rs | 94 ++- crates/iceberg/src/spec/mod.rs | 21 +- crates/iceberg/src/spec/partition.rs | 103 +++ crates/iceberg/src/spec/schema.rs | 124 +++ crates/iceberg/src/spec/snapshot.rs | 346 ++++++++ crates/iceberg/src/spec/sort.rs | 133 +++ crates/iceberg/src/spec/table_metadata.rs | 976 ++++++++++++++++++++++ 9 files changed, 1783 insertions(+), 19 deletions(-) create mode 100644 crates/iceberg/src/spec/partition.rs create mode 100644 crates/iceberg/src/spec/snapshot.rs create mode 100644 crates/iceberg/src/spec/sort.rs create mode 100644 crates/iceberg/src/spec/table_metadata.rs diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 0b27a5622..053acf87b 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -39,8 +39,10 @@ chrono = "0.4" uuid = "1.4.1" ordered-float = "3.7.0" bitvec = "1.0.1" +serde_repr = "0.1.16" itertools = "0.11" bimap = "0.6" +derive_builder = "0.12.0" [dev-dependencies] diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index fb9db5b68..8e77b9702 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -19,6 +19,9 @@ #![deny(missing_docs)] +#[macro_use] +extern crate derive_builder; + mod error; pub use error::Error; pub use error::ErrorKind; diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index 9b4c986a8..7b24eb057 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -21,10 +21,14 @@ use ::serde::de::{MapAccess, Visitor}; use serde::de::{Error, IntoDeserializer}; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; +use serde_json::Value as JsonValue; use std::cell::OnceCell; +use std::convert::identity; use std::sync::Arc; use std::{collections::HashMap, fmt, ops::Index}; +use super::values::Literal; + /// Field name for list type. pub(crate) const LIST_FILED_NAME: &str = "element"; pub(crate) const MAP_KEY_FIELD_NAME: &str = "key"; @@ -341,8 +345,8 @@ impl fmt::Display for StructType { } } -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] -#[serde(rename_all = "kebab-case")] +#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)] +#[serde(from = "SerdeNestedField", into = "SerdeNestedField")] /// A struct is a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema. /// Each field can be either optional or required, meaning that values can (or cannot) be null. Fields may be any type. /// Fields may have an optional comment or doc string. Fields can have default values. @@ -354,17 +358,65 @@ pub struct NestedField { /// Optional or required pub required: bool, /// Datatype - #[serde(rename = "type")] pub field_type: Box, /// Fields may have an optional comment or doc string. - #[serde(skip_serializing_if = "Option::is_none")] pub doc: Option, /// Used to populate the field’s value for all records that were written before the field was added to the schema - #[serde(skip_serializing_if = "Option::is_none")] - pub initial_default: Option, + pub initial_default: Option, /// Used to populate the field’s value for any records written after the field was added to the schema, if the writer does not supply the field’s value + pub write_default: Option, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[serde(rename_all = "kebab-case")] +struct SerdeNestedField { + pub id: i32, + pub name: String, + pub required: bool, + #[serde(rename = "type")] + pub field_type: Box, #[serde(skip_serializing_if = "Option::is_none")] - pub write_default: Option, + pub doc: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub initial_default: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub write_default: Option, +} + +impl From for NestedField { + fn from(value: SerdeNestedField) -> Self { + NestedField { + id: value.id, + name: value.name, + required: value.required, + initial_default: value.initial_default.and_then(|x| { + Literal::try_from_json(x, &value.field_type) + .ok() + .and_then(identity) + }), + write_default: value.write_default.and_then(|x| { + Literal::try_from_json(x, &value.field_type) + .ok() + .and_then(identity) + }), + field_type: value.field_type, + doc: value.doc, + } + } +} + +impl From for SerdeNestedField { + fn from(value: NestedField) -> Self { + SerdeNestedField { + id: value.id, + name: value.name, + required: value.required, + field_type: value.field_type, + doc: value.doc, + initial_default: value.initial_default.map(|x| (&x).into()), + write_default: value.write_default.map(|x| (&x).into()), + } + } } /// Reference to nested field. @@ -427,14 +479,14 @@ impl NestedField { } /// Set the field's initial default value. - pub fn with_initial_default(mut self, value: impl ToString) -> Self { - self.initial_default = Some(value.to_string()); + pub fn with_initial_default(mut self, value: Literal) -> Self { + self.initial_default = Some(value); self } /// Set the field's initial default value. - pub fn with_write_default(mut self, value: impl ToString) -> Self { - self.write_default = Some(value.to_string()); + pub fn with_write_default(mut self, value: Literal) -> Self { + self.write_default = Some(value); self } } @@ -581,6 +633,10 @@ pub struct MapType { #[cfg(test)] mod tests { + use uuid::Uuid; + + use crate::spec::values::PrimitiveLiteral; + use super::*; fn check_type_serde(json: &str, expected_type: Type) { @@ -685,8 +741,12 @@ mod tests { Type::Struct(StructType { fields: vec![ NestedField::required(1, "id", Type::Primitive(PrimitiveType::Uuid)) - .with_initial_default("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb") - .with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae") + .with_initial_default(Literal::Primitive(PrimitiveLiteral::UUID( + Uuid::parse_str("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb").unwrap(), + ))) + .with_write_default(Literal::Primitive(PrimitiveLiteral::UUID( + Uuid::parse_str("ec5911be-b0a7-458c-8438-c9a3e53cffae").unwrap(), + ))) .into(), NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Int)).into(), ], @@ -749,8 +809,12 @@ mod tests { let struct_type = Type::Struct(StructType::new(vec![ NestedField::required(1, "id", Type::Primitive(PrimitiveType::Uuid)) - .with_initial_default("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb") - .with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae") + .with_initial_default(Literal::Primitive(PrimitiveLiteral::UUID( + Uuid::parse_str("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb").unwrap(), + ))) + .with_write_default(Literal::Primitive(PrimitiveLiteral::UUID( + Uuid::parse_str("ec5911be-b0a7-458c-8438-c9a3e53cffae").unwrap(), + ))) .into(), NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Int)).into(), NestedField::required( diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index e87bb1fd4..dedde72e2 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -17,7 +17,20 @@ //! Spec for Iceberg. -pub mod datatypes; -pub mod schema; -pub mod transform; -pub mod values; +mod datatypes; +mod partition; +mod schema; +mod snapshot; +mod sort; +mod table_metadata; +mod transform; +mod values; + +pub use datatypes::*; +pub use partition::*; +pub use schema::*; +pub use snapshot::*; +pub use sort::*; +pub use table_metadata::*; +pub use transform::*; +pub use values::*; diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs new file mode 100644 index 000000000..505f24864 --- /dev/null +++ b/crates/iceberg/src/spec/partition.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/*! + * Partitioning +*/ +use serde::{Deserialize, Serialize}; + +use super::transform::Transform; + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[serde(rename_all = "kebab-case")] +/// Partition fields capture the transform from table data to partition values. +pub struct PartitionField { + /// A source column id from the table’s schema + pub source_id: i32, + /// A partition field id that is used to identify a partition field and is unique within a partition spec. + /// In v2 table metadata, it is unique across all partition specs. + pub field_id: i32, + /// A partition name. + pub name: String, + /// A transform that is applied to the source column to produce a partition value. + pub transform: Transform, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)] +#[serde(rename_all = "kebab-case")] +#[builder(setter(prefix = "with"))] +/// Partition spec that defines how to produce a tuple of partition values from a record. +pub struct PartitionSpec { + /// Identifier for PartitionSpec + pub spec_id: i32, + /// Details of the partition spec + #[builder(setter(each(name = "with_partition_field")))] + pub fields: Vec, +} + +impl PartitionSpec { + /// Create partition spec builer + pub fn builder() -> PartitionSpecBuilder { + PartitionSpecBuilder::default() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn partition_spec() { + let sort_order = r#" + { + "spec-id": 1, + "fields": [ { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + }, { + "source-id": 1, + "field-id": 1001, + "name": "id_bucket", + "transform": "bucket[16]" + }, { + "source-id": 2, + "field-id": 1002, + "name": "id_truncate", + "transform": "truncate[4]" + } ] + } + "#; + + let partition_spec: PartitionSpec = serde_json::from_str(sort_order).unwrap(); + assert_eq!(4, partition_spec.fields[0].source_id); + assert_eq!(1000, partition_spec.fields[0].field_id); + assert_eq!("ts_day", partition_spec.fields[0].name); + assert_eq!(Transform::Day, partition_spec.fields[0].transform); + + assert_eq!(1, partition_spec.fields[1].source_id); + assert_eq!(1001, partition_spec.fields[1].field_id); + assert_eq!("id_bucket", partition_spec.fields[1].name); + assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform); + + assert_eq!(2, partition_spec.fields[2].source_id); + assert_eq!(1002, partition_spec.fields[2].field_id); + assert_eq!("id_truncate", partition_spec.fields[2].name); + assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform); + } +} diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index e1c6f2c77..654a10e9b 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -609,6 +609,92 @@ impl SchemaVisitor for IndexByName { } } +pub(super) mod _serde { + /// This is a helper module that defines types to help with serialization/deserialization. + /// For deserialization the input first gets read into either the [SchemaV1] or [SchemaV2] struct + /// and then converted into the [Schema] struct. Serialization works the other way around. + /// [SchemaV1] and [SchemaV2] are internal struct that are only used for serialization and deserialization. + use serde::{Deserialize, Serialize}; + + use crate::{spec::StructType, Error, Result}; + + use super::{Schema, DEFAULT_SCHEMA_ID}; + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] + #[serde(rename_all = "kebab-case")] + /// Defines the structure of a v2 schema for serialization/deserialization + pub(crate) struct SchemaV2 { + pub schema_id: i32, + #[serde(skip_serializing_if = "Option::is_none")] + pub identifier_field_ids: Option>, + #[serde(flatten)] + pub fields: StructType, + } + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] + #[serde(rename_all = "kebab-case")] + /// Defines the structure of a v1 schema for serialization/deserialization + pub(crate) struct SchemaV1 { + #[serde(skip_serializing_if = "Option::is_none")] + pub schema_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub identifier_field_ids: Option>, + #[serde(flatten)] + pub fields: StructType, + } + + impl TryFrom for Schema { + type Error = Error; + fn try_from(value: SchemaV2) -> Result { + dbg!(&value); + Schema::builder() + .with_schema_id(value.schema_id) + .with_fields(value.fields.fields().iter().cloned()) + .with_identifier_field_ids(value.identifier_field_ids.unwrap_or_default()) + .build() + } + } + + impl TryFrom for Schema { + type Error = Error; + fn try_from(value: SchemaV1) -> Result { + Schema::builder() + .with_schema_id(value.schema_id.unwrap_or(DEFAULT_SCHEMA_ID)) + .with_fields(value.fields.fields().iter().cloned()) + .with_identifier_field_ids(value.identifier_field_ids.unwrap_or_default()) + .build() + } + } + + impl From for SchemaV2 { + fn from(value: Schema) -> Self { + SchemaV2 { + schema_id: value.schema_id, + identifier_field_ids: if value.identifier_field_ids.is_empty() { + None + } else { + Some(value.identifier_field_ids.into_iter().collect()) + }, + fields: value.r#struct, + } + } + } + + impl From for SchemaV1 { + fn from(value: Schema) -> Self { + SchemaV1 { + schema_id: Some(value.schema_id), + identifier_field_ids: if value.identifier_field_ids.is_empty() { + None + } else { + Some(value.identifier_field_ids.into_iter().collect()) + }, + fields: value.r#struct, + } + } + } +} + #[cfg(test)] mod tests { use crate::spec::datatypes::Type::{List, Map, Primitive, Struct}; @@ -616,6 +702,7 @@ mod tests { ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, StructType, Type, }; use crate::spec::schema::Schema; + use crate::spec::schema::_serde::SchemaV2; use std::collections::HashMap; #[test] @@ -639,6 +726,43 @@ mod tests { assert_eq!(None, schema.field_by_id(3)); } + #[test] + fn schema() { + let record = r#" + { + "type": "struct", + "schema-id": 1, + "fields": [ { + "id": 1, + "name": "id", + "required": true, + "type": "uuid" + }, { + "id": 2, + "name": "data", + "required": false, + "type": "int" + } ] + } + "#; + + let result: SchemaV2 = serde_json::from_str(record).unwrap(); + assert_eq!(1, result.schema_id); + assert_eq!( + Box::new(Type::Primitive(PrimitiveType::Uuid)), + result.fields[0].field_type + ); + assert_eq!(1, result.fields[0].id); + assert!(result.fields[0].required); + + assert_eq!( + Box::new(Type::Primitive(PrimitiveType::Int)), + result.fields[1].field_type + ); + assert_eq!(2, result.fields[1].id); + assert!(!result.fields[1].required); + } + fn table_schema_simple() -> Schema { Schema::builder() .with_schema_id(1) diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs new file mode 100644 index 000000000..9a802883b --- /dev/null +++ b/crates/iceberg/src/spec/snapshot.rs @@ -0,0 +1,346 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/*! + * Snapshots +*/ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +use super::table_metadata::SnapshotLog; + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[serde(rename_all = "lowercase")] +/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots. +pub enum Operation { + /// Only data files were added and no files were removed. + Append, + /// Data and delete files were added and removed without changing table data; + /// i.e., compaction, changing the data file format, or relocating data files. + Replace, + /// Data and delete files were added and removed in a logical overwrite operation. + Overwrite, + /// Data files were removed and their contents logically deleted and/or delete files were added to delete rows. + Delete, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +/// Summarises the changes in the snapshot. +pub struct Summary { + /// The type of operation in the snapshot + pub operation: Operation, + /// Other summary data. + #[serde(flatten)] + pub other: HashMap, +} + +impl Default for Operation { + fn default() -> Operation { + Self::Append + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Builder)] +#[builder(setter(prefix = "with"))] +/// A snapshot represents the state of a table at some time and is used to access the complete set of data files in the table. +pub struct Snapshot { + /// A unique long ID + snapshot_id: i64, + /// The snapshot ID of the snapshot’s parent. + /// Omitted for any snapshot with no parent + #[builder(default = "None")] + parent_snapshot_id: Option, + /// A monotonically increasing long that tracks the order of + /// changes to a table. + sequence_number: i64, + /// A timestamp when the snapshot was created, used for garbage + /// collection and table inspection + timestamp_ms: i64, + /// The location of a manifest list for this snapshot that + /// tracks manifest files with additional metadata. + manifest_list: ManifestList, + /// A string map that summarizes the snapshot changes, including operation. + summary: Summary, + /// ID of the table’s current schema when the snapshot was created. + #[builder(setter(strip_option))] + schema_id: Option, +} + +/// Type to distinguish between a path to a manifestlist file or a vector of manifestfile locations +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[serde(untagged)] +pub enum ManifestList { + /// Location of manifestlist file + ManifestListFile(String), + /// Manifestfile locations + ManifestFiles(Vec), +} + +impl Snapshot { + /// Get the id of the snapshot + #[inline] + pub fn snapshot_id(&self) -> i64 { + self.snapshot_id + } + /// Get sequence_number of the snapshot. Is 0 for Iceberg V1 tables. + #[inline] + pub fn sequence_number(&self) -> i64 { + self.sequence_number + } + /// Get location of manifest_list file + #[inline] + pub fn manifest_list(&self) -> &ManifestList { + &self.manifest_list + } + /// Get summary of the snapshot + #[inline] + pub fn summary(&self) -> &Summary { + &self.summary + } + /// Get the timestamp of when the snapshot was created + #[inline] + pub fn timestamp(&self) -> i64 { + self.timestamp_ms + } + /// Create snapshot builder + pub fn builder() -> SnapshotBuilder { + SnapshotBuilder::default() + } + + pub(crate) fn log(&self) -> SnapshotLog { + SnapshotLog { + timestamp_ms: self.timestamp_ms, + snapshot_id: self.snapshot_id, + } + } +} + +pub(super) mod _serde { + /// This is a helper module that defines types to help with serialization/deserialization. + /// For deserialization the input first gets read into either the [SnapshotV1] or [SnapshotV2] struct + /// and then converted into the [Snapshot] struct. Serialization works the other way around. + /// [SnapshotV1] and [SnapshotV2] are internal struct that are only used for serialization and deserialization. + use std::collections::HashMap; + + use serde::{Deserialize, Serialize}; + + use crate::{Error, ErrorKind}; + + use super::{ManifestList, Operation, Snapshot, Summary}; + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(rename_all = "kebab-case")] + /// Defines the structure of a v2 snapshot for serialization/deserialization + pub(crate) struct SnapshotV2 { + pub snapshot_id: i64, + #[serde(skip_serializing_if = "Option::is_none")] + pub parent_snapshot_id: Option, + pub sequence_number: i64, + pub timestamp_ms: i64, + pub manifest_list: String, + pub summary: Summary, + #[serde(skip_serializing_if = "Option::is_none")] + pub schema_id: Option, + } + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(rename_all = "kebab-case")] + /// Defines the structure of a v1 snapshot for serialization/deserialization + pub(crate) struct SnapshotV1 { + pub snapshot_id: i64, + #[serde(skip_serializing_if = "Option::is_none")] + pub parent_snapshot_id: Option, + pub timestamp_ms: i64, + #[serde(skip_serializing_if = "Option::is_none")] + pub manifest_list: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub manifests: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub summary: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub schema_id: Option, + } + + impl From for Snapshot { + fn from(v2: SnapshotV2) -> Self { + Snapshot { + snapshot_id: v2.snapshot_id, + parent_snapshot_id: v2.parent_snapshot_id, + sequence_number: v2.sequence_number, + timestamp_ms: v2.timestamp_ms, + manifest_list: ManifestList::ManifestListFile(v2.manifest_list), + summary: v2.summary, + schema_id: v2.schema_id, + } + } + } + + impl From for SnapshotV2 { + fn from(v2: Snapshot) -> Self { + SnapshotV2 { + snapshot_id: v2.snapshot_id, + parent_snapshot_id: v2.parent_snapshot_id, + sequence_number: v2.sequence_number, + timestamp_ms: v2.timestamp_ms, + manifest_list: match v2.manifest_list { + ManifestList::ManifestListFile(file) => file, + ManifestList::ManifestFiles(_) => panic!("Wrong table format version. Can't convert a list of manifest files into a location of a manifest file.") + }, + summary: v2.summary, + schema_id: v2.schema_id, + } + } + } + + impl TryFrom for Snapshot { + type Error = Error; + + fn try_from(v1: SnapshotV1) -> Result { + Ok(Snapshot { + snapshot_id: v1.snapshot_id, + parent_snapshot_id: v1.parent_snapshot_id, + sequence_number: 0, + timestamp_ms: v1.timestamp_ms, + manifest_list: match (v1.manifest_list, v1.manifests) { + (Some(file), _) => ManifestList::ManifestListFile(file), + (None, Some(files)) => ManifestList::ManifestFiles(files), + (None, None) => { + return Err(Error::new( + ErrorKind::DataInvalid, + "Neither manifestlist file or manifest files are provided.", + )) + } + }, + summary: v1.summary.unwrap_or(Summary { + operation: Operation::default(), + other: HashMap::new(), + }), + schema_id: v1.schema_id, + }) + } + } + + impl From for SnapshotV1 { + fn from(v2: Snapshot) -> Self { + let (manifest_list, manifests) = match v2.manifest_list { + ManifestList::ManifestListFile(file) => (Some(file), None), + ManifestList::ManifestFiles(files) => (None, Some(files)), + }; + SnapshotV1 { + snapshot_id: v2.snapshot_id, + parent_snapshot_id: v2.parent_snapshot_id, + timestamp_ms: v2.timestamp_ms, + manifest_list, + manifests, + summary: Some(v2.summary), + schema_id: v2.schema_id, + } + } + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[serde(rename_all = "kebab-case")] +/// Iceberg tables keep track of branches and tags using snapshot references. +pub struct SnapshotReference { + /// A reference’s snapshot ID. The tagged snapshot or latest snapshot of a branch. + pub snapshot_id: i64, + #[serde(flatten)] + /// Snapshot retention policy + pub retention: SnapshotRetention, +} + +impl SnapshotReference { + /// Create new snapshot reference + pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self { + SnapshotReference { + snapshot_id, + retention, + } + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[serde(rename_all = "lowercase", tag = "type")] +/// The snapshot expiration procedure removes snapshots from table metadata and applies the table’s retention policy. +pub enum SnapshotRetention { + #[serde(rename_all = "kebab-case")] + /// Branches are mutable named references that can be updated by committing a new snapshot as + /// the branch’s referenced snapshot using the Commit Conflict Resolution and Retry procedures. + Branch { + /// A positive number for the minimum number of snapshots to keep in a branch while expiring snapshots. + /// Defaults to table property history.expire.min-snapshots-to-keep. + #[serde(skip_serializing_if = "Option::is_none")] + min_snapshots_to_keep: Option, + /// A positive number for the max age of snapshots to keep when expiring, including the latest snapshot. + /// Defaults to table property history.expire.max-snapshot-age-ms. + #[serde(skip_serializing_if = "Option::is_none")] + max_snapshot_age_ms: Option, + /// For snapshot references except the main branch, a positive number for the max age of the snapshot reference to keep while expiring snapshots. + /// Defaults to table property history.expire.max-ref-age-ms. The main branch never expires. + #[serde(skip_serializing_if = "Option::is_none")] + max_ref_age_ms: Option, + }, + #[serde(rename_all = "kebab-case")] + /// Tags are labels for individual snapshots. + Tag { + /// For snapshot references except the main branch, a positive number for the max age of the snapshot reference to keep while expiring snapshots. + /// Defaults to table property history.expire.max-ref-age-ms. The main branch never expires. + max_ref_age_ms: i64, + }, +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use crate::spec::snapshot::{ManifestList, Operation, Snapshot, Summary, _serde::SnapshotV1}; + + #[test] + fn schema() { + let record = r#" + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://b/wh/.../s1.avro", + "schema-id": 0 + } + "#; + + let result: Snapshot = serde_json::from_str::(record) + .unwrap() + .try_into() + .unwrap(); + assert_eq!(3051729675574597004, result.snapshot_id()); + assert_eq!(1515100955770, result.timestamp()); + assert_eq!( + Summary { + operation: Operation::Append, + other: HashMap::new() + }, + *result.summary() + ); + assert_eq!( + ManifestList::ManifestListFile("s3://b/wh/.../s1.avro".to_string()), + *result.manifest_list() + ); + } +} diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs new file mode 100644 index 000000000..357e68f44 --- /dev/null +++ b/crates/iceberg/src/spec/sort.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/*! + * Sorting +*/ +use serde::{Deserialize, Serialize}; + +use super::transform::Transform; + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +/// Sort direction in a partition, either ascending or descending +pub enum SortDirection { + /// Ascending + #[serde(rename = "asc")] + Ascending, + /// Descending + #[serde(rename = "desc")] + Descending, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +/// Describes the order of null values when sorted. +pub enum NullOrder { + #[serde(rename = "nulls-first")] + /// Nulls are stored first + First, + #[serde(rename = "nulls-last")] + /// Nulls are stored last + Last, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[serde(rename_all = "kebab-case")] +/// Entry for every column that is to be sorted +pub struct SortField { + /// A source column id from the table’s schema + pub source_id: i64, + /// A transform that is used to produce values to be sorted on from the source column. + pub transform: Transform, + /// A sort direction, that can only be either asc or desc + pub direction: SortDirection, + /// A null order that describes the order of null values when sorted. + pub null_order: NullOrder, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Builder)] +#[serde(rename_all = "kebab-case")] +#[builder(setter(prefix = "with"))] +/// A sort order is defined by a sort order id and a list of sort fields. +/// The order of the sort fields within the list defines the order in which the sort is applied to the data. +pub struct SortOrder { + /// Identifier for SortOrder, order_id `0` is no sort order. + pub order_id: i64, + /// Details of the sort + #[builder(setter(each(name = "with_sort_field")), default)] + pub fields: Vec, +} + +impl SortOrder { + /// Create sort order builder + pub fn builder() -> SortOrderBuilder { + SortOrderBuilder::default() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sort_field() { + let sort_field = r#" + { + "transform": "bucket[4]", + "source-id": 3, + "direction": "desc", + "null-order": "nulls-last" + } + "#; + + let field: SortField = serde_json::from_str(sort_field).unwrap(); + assert_eq!(Transform::Bucket(4), field.transform); + assert_eq!(3, field.source_id); + assert_eq!(SortDirection::Descending, field.direction); + assert_eq!(NullOrder::Last, field.null_order); + } + + #[test] + fn sort_order() { + let sort_order = r#" + { + "order-id": 1, + "fields": [ { + "transform": "identity", + "source-id": 2, + "direction": "asc", + "null-order": "nulls-first" + }, { + "transform": "bucket[4]", + "source-id": 3, + "direction": "desc", + "null-order": "nulls-last" + } ] + } + "#; + + let order: SortOrder = serde_json::from_str(sort_order).unwrap(); + assert_eq!(Transform::Identity, order.fields[0].transform); + assert_eq!(2, order.fields[0].source_id); + assert_eq!(SortDirection::Ascending, order.fields[0].direction); + assert_eq!(NullOrder::First, order.fields[0].null_order); + + assert_eq!(Transform::Bucket(4), order.fields[1].transform); + assert_eq!(3, order.fields[1].source_id); + assert_eq!(SortDirection::Descending, order.fields[1].direction); + assert_eq!(NullOrder::Last, order.fields[1].null_order); + } +} diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs new file mode 100644 index 000000000..40fe11aa2 --- /dev/null +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -0,0 +1,976 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/*! +Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata). +The main struct here is [TableMetadataV2] which defines the data for a table. +*/ + +use std::{collections::HashMap, sync::Arc}; + +use serde::{Deserialize, Serialize}; +use serde_repr::{Deserialize_repr, Serialize_repr}; +use uuid::Uuid; + +use crate::{Error, ErrorKind}; + +use super::{ + partition::PartitionSpec, + schema::Schema, + snapshot::{Snapshot, SnapshotReference, SnapshotRetention}, + sort::SortOrder, +}; + +use _serde::TableMetadataEnum; + +static MAIN_BRANCH: &str = "main"; +static DEFAULT_SPEC_ID: i32 = 0; + +#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)] +#[serde(try_from = "TableMetadataEnum", into = "TableMetadataEnum")] +/// Fields for the version 2 of the table metadata. +pub struct TableMetadata { + /// Integer Version for the format. + format_version: FormatVersion, + /// A UUID that identifies the table + table_uuid: Uuid, + /// Location tables base location + location: String, + /// The tables highest sequence number + last_sequence_number: i64, + /// Timestamp in milliseconds from the unix epoch when the table was last updated. + last_updated_ms: i64, + /// An integer; the highest assigned column ID for the table. + last_column_id: i32, + /// A list of schemas, stored as objects with schema-id. + schemas: HashMap>, + /// ID of the table’s current schema. + current_schema_id: i32, + /// A list of partition specs, stored as full partition spec objects. + partition_specs: HashMap, + /// ID of the “current” spec that writers should use by default. + default_spec_id: i32, + /// An integer; the highest assigned partition field ID across all partition specs for the table. + last_partition_id: i32, + ///A string to string map of table properties. This is used to control settings that + /// affect reading and writing and is not intended to be used for arbitrary metadata. + /// For example, commit.retry.num-retries is used to control the number of commit retries. + properties: HashMap, + /// long ID of the current table snapshot; must be the same as the current + /// ID of the main branch in refs. + current_snapshot_id: Option, + ///A list of valid snapshots. Valid snapshots are snapshots for which all + /// data files exist in the file system. A data file must not be deleted + /// from the file system until the last snapshot in which it was listed is + /// garbage collected. + snapshots: Option>>, + /// A list (optional) of timestamp and snapshot ID pairs that encodes changes + /// to the current snapshot for the table. Each time the current-snapshot-id + /// is changed, a new entry should be added with the last-updated-ms + /// and the new current-snapshot-id. When snapshots are expired from + /// the list of valid snapshots, all entries before a snapshot that has + /// expired should be removed. + snapshot_log: Vec, + + /// A list (optional) of timestamp and metadata file location pairs + /// that encodes changes to the previous metadata files for the table. + /// Each time a new metadata file is created, a new entry of the + /// previous metadata file location should be added to the list. + /// Tables can be configured to remove oldest metadata log entries and + /// keep a fixed-size log of the most recent entries after a commit. + metadata_log: Vec, + + /// A list of sort orders, stored as full sort order objects. + sort_orders: HashMap, + /// Default sort order id of the table. Note that this could be used by + /// writers, but is not used when reading because reads use the specs + /// stored in manifest files. + default_sort_order_id: i64, + ///A map of snapshot references. The map keys are the unique snapshot reference + /// names in the table, and the map values are snapshot reference objects. + /// There is always a main branch reference pointing to the current-snapshot-id + /// even if the refs map is null. + refs: HashMap, +} + +impl TableMetadata { + /// Get current schema + #[inline] + pub fn current_schema(&self) -> Result, Error> { + self.schemas + .get(&self.current_schema_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Schema id {} not found!", self.current_schema_id), + ) + }) + .cloned() + } + /// Get default partition spec + #[inline] + pub fn default_partition_spec(&self) -> Result<&PartitionSpec, Error> { + self.partition_specs + .get(&self.default_spec_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Partition spec id {} not found!", self.default_spec_id), + ) + }) + } + + /// Get current snapshot + #[inline] + pub fn current_snapshot(&self) -> Result>, Error> { + match (&self.current_snapshot_id, &self.snapshots) { + (Some(snapshot_id), Some(snapshots)) => Ok(snapshots.get(snapshot_id).cloned()), + (Some(-1), None) => Ok(None), + (None, None) => Ok(None), + (Some(_), None) => Err(Error::new( + ErrorKind::DataInvalid, + "Snapshot id is provided but there are no snapshots".to_string(), + )), + (None, Some(_)) => Err(Error::new( + ErrorKind::DataInvalid, + "There are snapshots but no snapshot id is provided".to_string(), + )), + } + } + + /// Append snapshot to table + pub fn append_snapshot(&mut self, snapshot: Snapshot) -> Result<(), Error> { + self.last_updated_ms = snapshot.timestamp(); + self.last_sequence_number = snapshot.sequence_number(); + + self.refs + .entry(MAIN_BRANCH.to_string()) + .and_modify(|s| { + s.snapshot_id = snapshot.snapshot_id(); + }) + .or_insert_with(|| { + SnapshotReference::new( + snapshot.snapshot_id(), + SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + ) + }); + + if let Some(snapshots) = &mut self.snapshots { + self.snapshot_log.push(snapshot.log()); + snapshots.insert(snapshot.snapshot_id(), Arc::new(snapshot)); + } else { + if !self.snapshot_log.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Snapshot logs is empty while snapshots is not!", + )); + } + + self.snapshot_log = vec![snapshot.log()]; + self.snapshots = Some(HashMap::from_iter(vec![( + snapshot.snapshot_id(), + Arc::new(snapshot), + )])); + } + + Ok(()) + } +} + +pub(super) mod _serde { + /// This is a helper module that defines types to help with serialization/deserialization. + /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct + /// and then converted into the [TableMetadata] struct. Serialization works the other way around. + /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization. + use std::{collections::HashMap, sync::Arc}; + + use serde::{Deserialize, Serialize}; + use uuid::Uuid; + + use crate::{ + spec::{ + schema::_serde::{SchemaV1, SchemaV2}, + snapshot::_serde::{SnapshotV1, SnapshotV2}, + PartitionField, PartitionSpec, Schema, SnapshotReference, SnapshotRetention, SortOrder, + }, + Error, + }; + + use super::{ + FormatVersion, MetadataLog, SnapshotLog, TableMetadata, DEFAULT_SPEC_ID, MAIN_BRANCH, + }; + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(untagged)] + pub(super) enum TableMetadataEnum { + V2(TableMetadataV2), + V1(TableMetadataV1), + } + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(rename_all = "kebab-case")] + /// Defines the structure of a v2 table metadata for serialization/deserialization + pub(super) struct TableMetadataV2 { + pub format_version: VersionNumber<2>, + pub table_uuid: Uuid, + pub location: String, + pub last_sequence_number: i64, + pub last_updated_ms: i64, + pub last_column_id: i32, + pub schemas: Vec, + pub current_schema_id: i32, + pub partition_specs: Vec, + pub default_spec_id: i32, + pub last_partition_id: i32, + #[serde(skip_serializing_if = "Option::is_none")] + pub properties: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub current_snapshot_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub snapshots: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub snapshot_log: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata_log: Option>, + pub sort_orders: Vec, + pub default_sort_order_id: i64, + #[serde(skip_serializing_if = "Option::is_none")] + pub refs: Option>, + } + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + #[serde(rename_all = "kebab-case")] + /// Defines the structure of a v1 table metadata for serialization/deserialization + pub(super) struct TableMetadataV1 { + pub format_version: VersionNumber<1>, + #[serde(skip_serializing_if = "Option::is_none")] + pub table_uuid: Option, + pub location: String, + pub last_updated_ms: i64, + pub last_column_id: i32, + pub schema: SchemaV1, + #[serde(skip_serializing_if = "Option::is_none")] + pub schemas: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub current_schema_id: Option, + pub partition_spec: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub partition_specs: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub default_spec_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_partition_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub properties: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub current_snapshot_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub snapshots: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub snapshot_log: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata_log: Option>, + pub sort_orders: Vec, + pub default_sort_order_id: i64, + } + + /// Helper to serialize and deserialize the format version. + #[derive(Debug, PartialEq, Eq)] + pub(super) struct VersionNumber; + + impl Serialize for VersionNumber { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_u8(V) + } + } + + impl<'de, const V: u8> Deserialize<'de> for VersionNumber { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let value = u8::deserialize(deserializer)?; + if value == V { + Ok(VersionNumber::) + } else { + Err(serde::de::Error::custom("Invalid Version")) + } + } + } + + impl TryFrom for TableMetadata { + type Error = Error; + fn try_from(value: TableMetadataEnum) -> Result { + match value { + TableMetadataEnum::V2(value) => value.try_into(), + TableMetadataEnum::V1(value) => value.try_into(), + } + } + } + + impl From for TableMetadataEnum { + fn from(value: TableMetadata) -> Self { + match value.format_version { + FormatVersion::V2 => TableMetadataEnum::V2(value.into()), + FormatVersion::V1 => TableMetadataEnum::V1(value.into()), + } + } + } + + impl TryFrom for TableMetadata { + type Error = Error; + fn try_from(value: TableMetadataV2) -> Result { + let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id { + None + } else { + value.current_snapshot_id + }; + Ok(TableMetadata { + format_version: FormatVersion::V2, + table_uuid: value.table_uuid, + location: value.location, + last_sequence_number: value.last_sequence_number, + last_updated_ms: value.last_updated_ms, + last_column_id: value.last_column_id, + schemas: HashMap::from_iter( + value + .schemas + .into_iter() + .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?)))) + .collect::, Error>>()? + .into_iter(), + ), + current_schema_id: value.current_schema_id, + partition_specs: HashMap::from_iter( + value.partition_specs.into_iter().map(|x| (x.spec_id, x)), + ), + default_spec_id: value.default_spec_id, + last_partition_id: value.last_partition_id, + properties: value.properties.unwrap_or_default(), + current_snapshot_id, + snapshots: value.snapshots.map(|snapshots| { + HashMap::from_iter( + snapshots + .into_iter() + .map(|x| (x.snapshot_id, Arc::new(x.into()))), + ) + }), + snapshot_log: value.snapshot_log.unwrap_or_default(), + metadata_log: value.metadata_log.unwrap_or_default(), + sort_orders: HashMap::from_iter( + value.sort_orders.into_iter().map(|x| (x.order_id, x)), + ), + default_sort_order_id: value.default_sort_order_id, + refs: value.refs.unwrap_or_else(|| { + if let Some(snapshot_id) = current_snapshot_id { + HashMap::from_iter(vec![( + MAIN_BRANCH.to_string(), + SnapshotReference { + snapshot_id, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }, + )]) + } else { + HashMap::new() + } + }), + }) + } + } + + impl TryFrom for TableMetadata { + type Error = Error; + fn try_from(value: TableMetadataV1) -> Result { + let schemas = value + .schemas + .map(|schemas| { + Ok::<_, Error>(HashMap::from_iter( + schemas + .into_iter() + .enumerate() + .map(|(i, schema)| { + Ok(( + schema.schema_id.unwrap_or(i as i32), + Arc::new(schema.try_into()?), + )) + }) + .collect::, Error>>()? + .into_iter(), + )) + }) + .or_else(|| { + Some(value.schema.try_into().map(|schema: Schema| { + HashMap::from_iter(vec![(schema.schema_id(), Arc::new(schema))]) + })) + }) + .transpose()? + .unwrap_or_default(); + let partition_specs = HashMap::from_iter( + value + .partition_specs + .unwrap_or_else(|| { + vec![PartitionSpec { + spec_id: DEFAULT_SPEC_ID, + fields: value.partition_spec, + }] + }) + .into_iter() + .map(|x| (x.spec_id, x)), + ); + Ok(TableMetadata { + format_version: FormatVersion::V1, + table_uuid: value.table_uuid.unwrap_or_default(), + location: value.location, + last_sequence_number: 0, + last_updated_ms: value.last_updated_ms, + last_column_id: value.last_column_id, + current_schema_id: value + .current_schema_id + .unwrap_or_else(|| schemas.keys().copied().max().unwrap_or_default()), + default_spec_id: value + .default_spec_id + .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()), + last_partition_id: value + .last_partition_id + .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()), + partition_specs, + schemas, + + properties: value.properties.unwrap_or_default(), + current_snapshot_id: if let &Some(-1) = &value.current_snapshot_id { + None + } else { + value.current_snapshot_id + }, + snapshots: value + .snapshots + .map(|snapshots| { + Ok::<_, Error>(HashMap::from_iter( + snapshots + .into_iter() + .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?)))) + .collect::, Error>>()?, + )) + }) + .transpose()?, + snapshot_log: value.snapshot_log.unwrap_or_default(), + metadata_log: value.metadata_log.unwrap_or_default(), + sort_orders: HashMap::from_iter( + value.sort_orders.into_iter().map(|x| (x.order_id, x)), + ), + default_sort_order_id: value.default_sort_order_id, + refs: HashMap::from_iter(vec![( + MAIN_BRANCH.to_string(), + SnapshotReference { + snapshot_id: value.current_snapshot_id.unwrap_or_default(), + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }, + )]), + }) + } + } + + impl From for TableMetadataV2 { + fn from(v: TableMetadata) -> Self { + TableMetadataV2 { + format_version: VersionNumber::<2>, + table_uuid: v.table_uuid, + location: v.location, + last_sequence_number: v.last_sequence_number, + last_updated_ms: v.last_updated_ms, + last_column_id: v.last_column_id, + schemas: v + .schemas + .into_values() + .map(|x| { + Arc::try_unwrap(x) + .unwrap_or_else(|schema| schema.as_ref().clone()) + .into() + }) + .collect(), + current_schema_id: v.current_schema_id, + partition_specs: v.partition_specs.into_values().collect(), + default_spec_id: v.default_spec_id, + last_partition_id: v.last_partition_id, + properties: if v.properties.is_empty() { + None + } else { + Some(v.properties) + }, + current_snapshot_id: v.current_snapshot_id.or(Some(-1)), + snapshots: v.snapshots.map(|snapshots| { + snapshots + .into_values() + .map(|x| { + Arc::try_unwrap(x) + .unwrap_or_else(|snapshot| snapshot.as_ref().clone()) + .into() + }) + .collect() + }), + snapshot_log: if v.snapshot_log.is_empty() { + None + } else { + Some(v.snapshot_log) + }, + metadata_log: if v.metadata_log.is_empty() { + None + } else { + Some(v.metadata_log) + }, + sort_orders: v.sort_orders.into_values().collect(), + default_sort_order_id: v.default_sort_order_id, + refs: Some(v.refs), + } + } + } + + impl From for TableMetadataV1 { + fn from(v: TableMetadata) -> Self { + TableMetadataV1 { + format_version: VersionNumber::<1>, + table_uuid: Some(v.table_uuid), + location: v.location, + last_updated_ms: v.last_updated_ms, + last_column_id: v.last_column_id, + schema: v + .schemas + .get(&v.current_schema_id) + .unwrap() + .as_ref() + .clone() + .into(), + schemas: Some( + v.schemas + .into_values() + .map(|x| { + Arc::try_unwrap(x) + .unwrap_or_else(|schema| schema.as_ref().clone()) + .into() + }) + .collect(), + ), + current_schema_id: Some(v.current_schema_id), + partition_spec: v + .partition_specs + .get(&v.default_spec_id) + .map(|x| x.fields.clone()) + .unwrap_or_default(), + partition_specs: Some(v.partition_specs.into_values().collect()), + default_spec_id: Some(v.default_spec_id), + last_partition_id: Some(v.last_partition_id), + properties: if v.properties.is_empty() { + None + } else { + Some(v.properties) + }, + current_snapshot_id: v.current_snapshot_id.or(Some(-1)), + snapshots: v.snapshots.map(|snapshots| { + snapshots + .into_values() + .map(|x| { + Arc::try_unwrap(x) + .unwrap_or_else(|snapshot| snapshot.as_ref().clone()) + .into() + }) + .collect() + }), + snapshot_log: if v.snapshot_log.is_empty() { + None + } else { + Some(v.snapshot_log) + }, + metadata_log: if v.metadata_log.is_empty() { + None + } else { + Some(v.metadata_log) + }, + sort_orders: v.sort_orders.into_values().collect(), + default_sort_order_id: v.default_sort_order_id, + } + } + } +} + +#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)] +#[repr(u8)] +/// Iceberg format version +pub enum FormatVersion { + /// Iceberg spec version 1 + V1 = b'1', + /// Iceberg spec version 2 + V2 = b'2', +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[serde(rename_all = "kebab-case")] +/// Encodes changes to the previous metadata files for the table +pub struct MetadataLog { + /// The file for the log. + pub metadata_file: String, + /// Time new metadata was created + pub timestamp_ms: i64, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[serde(rename_all = "kebab-case")] +/// A log of when each snapshot was made. +pub struct SnapshotLog { + /// Id of the snapshot. + pub snapshot_id: i64, + /// Last updated timestamp + pub timestamp_ms: i64, +} + +#[cfg(test)] +mod tests { + + use std::{collections::HashMap, sync::Arc}; + + use anyhow::Result; + use uuid::Uuid; + + use pretty_assertions::assert_eq; + + use crate::spec::{ + table_metadata::TableMetadata, ManifestList, NestedField, Operation, PartitionField, + PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, + SortOrder, Summary, Transform, Type, + }; + + use super::{FormatVersion, MetadataLog, SnapshotLog}; + + fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) { + let desered_type: TableMetadata = serde_json::from_str(json).unwrap(); + assert_eq!(desered_type, expected_type); + + let sered_json = serde_json::to_string(&expected_type).unwrap(); + let parsed_json_value = serde_json::from_str::(&sered_json).unwrap(); + + assert_eq!(parsed_json_value, desered_type); + } + + #[test] + fn test_table_data_v2() { + let data = r#" + { + "format-version" : 2, + "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94", + "location": "s3://b/wh/data.db/table", + "last-sequence-number" : 1, + "last-updated-ms": 1515100955770, + "last-column-id": 1, + "schemas": [ + { + "schema-id" : 1, + "type" : "struct", + "fields" :[ + { + "id": 1, + "name": "struct_name", + "required": true, + "type": "fixed[1]" + } + ] + } + ], + "current-schema-id" : 1, + "partition-specs": [ + { + "spec-id": 1, + "fields": [ + { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + } + ] + } + ], + "default-spec-id": 1, + "last-partition-id": 1000, + "properties": { + "commit.retry.num-retries": "1" + }, + "metadata-log": [ + { + "metadata-file": "s3://bucket/.../v1.json", + "timestamp-ms": 1515100 + } + ], + "sort-orders": [], + "default-sort-order-id": 0 + } + "#; + + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![Arc::new(NestedField::required( + 1, + "struct_name", + Type::Primitive(PrimitiveType::Fixed(1)), + ))]) + .build() + .unwrap(); + + let partition_spec = PartitionSpec::builder() + .with_spec_id(1) + .with_partition_field(PartitionField { + name: "ts_day".to_string(), + transform: Transform::Day, + source_id: 4, + field_id: 1000, + }) + .build() + .unwrap(); + + let expected = TableMetadata { + format_version: FormatVersion::V2, + table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(), + location: "s3://b/wh/data.db/table".to_string(), + last_updated_ms: 1515100955770, + last_column_id: 1, + schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]), + current_schema_id: 1, + partition_specs: HashMap::from_iter(vec![(1, partition_spec)]), + default_spec_id: 1, + last_partition_id: 1000, + default_sort_order_id: 0, + sort_orders: HashMap::from_iter(vec![]), + snapshots: None, + current_snapshot_id: None, + last_sequence_number: 1, + properties: HashMap::from_iter(vec![( + "commit.retry.num-retries".to_string(), + "1".to_string(), + )]), + snapshot_log: Vec::new(), + metadata_log: vec![MetadataLog { + metadata_file: "s3://bucket/.../v1.json".to_string(), + timestamp_ms: 1515100, + }], + refs: HashMap::new(), + }; + + check_table_metadata_serde(data, expected); + } + + #[test] + fn test_table_data_v1() { + let data = r#" + { + "format-version" : 1, + "table-uuid" : "df838b92-0b32-465d-a44e-d39936e538b7", + "location" : "/home/iceberg/warehouse/nyc/taxis", + "last-updated-ms" : 1662532818843, + "last-column-id" : 5, + "schema" : { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "vendor_id", + "required" : false, + "type" : "long" + }, { + "id" : 2, + "name" : "trip_id", + "required" : false, + "type" : "long" + }, { + "id" : 3, + "name" : "trip_distance", + "required" : false, + "type" : "float" + }, { + "id" : 4, + "name" : "fare_amount", + "required" : false, + "type" : "double" + }, { + "id" : 5, + "name" : "store_and_fwd_flag", + "required" : false, + "type" : "string" + } ] + }, + "partition-spec" : [ { + "name" : "vendor_id", + "transform" : "identity", + "source-id" : 1, + "field-id" : 1000 + } ], + "last-partition-id" : 1000, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "owner" : "root" + }, + "current-snapshot-id" : 638933773299822130, + "refs" : { + "main" : { + "snapshot-id" : 638933773299822130, + "type" : "branch" + } + }, + "snapshots" : [ { + "snapshot-id" : 638933773299822130, + "timestamp-ms" : 1662532818843, + "sequence-number" : 0, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1662532784305", + "added-data-files" : "4", + "added-records" : "4", + "added-files-size" : "6001" + }, + "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro", + "schema-id" : 0 + } ], + "snapshot-log" : [ { + "timestamp-ms" : 1662532818843, + "snapshot-id" : 638933773299822130 + } ], + "metadata-log" : [ { + "timestamp-ms" : 1662532805245, + "metadata-file" : "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json" + } ] + } + "#; + + let schema = Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "vendor_id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "trip_id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 3, + "trip_distance", + Type::Primitive(PrimitiveType::Float), + )), + Arc::new(NestedField::optional( + 4, + "fare_amount", + Type::Primitive(PrimitiveType::Double), + )), + Arc::new(NestedField::optional( + 5, + "store_and_fwd_flag", + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(); + + let partition_spec = PartitionSpec::builder() + .with_spec_id(0) + .with_partition_field(PartitionField { + name: "vendor_id".to_string(), + transform: Transform::Identity, + source_id: 1, + field_id: 1000, + }) + .build() + .unwrap(); + + let sort_order = SortOrder::builder().with_order_id(0).build().unwrap(); + + let snapshot = Snapshot::builder() + .with_snapshot_id(638933773299822130) + .with_timestamp_ms(1662532818843) + .with_sequence_number(0) + .with_schema_id(0) + .with_manifest_list(ManifestList::ManifestListFile("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())) + .with_summary(Summary{operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(),"local-1662532784305".to_string()),("added-data-files".to_string(),"4".to_string()),("added-records".to_string(),"4".to_string()),("added-files-size".to_string(),"6001".to_string())])}) + .build().unwrap(); + + let expected = TableMetadata { + format_version: FormatVersion::V1, + table_uuid: Uuid::parse_str("df838b92-0b32-465d-a44e-d39936e538b7").unwrap(), + location: "/home/iceberg/warehouse/nyc/taxis".to_string(), + last_updated_ms: 1662532818843, + last_column_id: 5, + schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]), + current_schema_id: 0, + partition_specs: HashMap::from_iter(vec![(0, partition_spec)]), + default_spec_id: 0, + last_partition_id: 1000, + default_sort_order_id: 0, + sort_orders: HashMap::from_iter(vec![(0, sort_order)]), + snapshots: Some(HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))])), + current_snapshot_id: Some(638933773299822130), + last_sequence_number: 0, + properties: HashMap::from_iter(vec![("owner".to_string(),"root".to_string())]), + snapshot_log: vec![SnapshotLog { + snapshot_id: 638933773299822130, + timestamp_ms: 1662532818843, + }], + metadata_log: vec![MetadataLog{metadata_file:"/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245}], + refs: HashMap::from_iter(vec![("main".to_string(),SnapshotReference{snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None }})]) + }; + + check_table_metadata_serde(data, expected); + } + + #[test] + fn test_invalid_table_uuid() -> Result<()> { + let data = r#" + { + "format-version" : 2, + "table-uuid": "xxxx" + } + "#; + assert!(serde_json::from_str::(data).is_err()); + Ok(()) + } + #[test] + fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> { + let data = r#" + { + "format-version" : 1 + } + "#; + assert!(serde_json::from_str::(data).is_err()); + Ok(()) + } +}