diff --git a/.typos.toml b/.typos.toml new file mode 100644 index 000000000..93a9713e7 --- /dev/null +++ b/.typos.toml @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[files] +extend-exclude = ["**/testdata"] diff --git a/Cargo.toml b/Cargo.toml index 61b82c224..a59a4bb4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ serde_bytes = "0.11.8" serde_derive = "^1.0" serde_json = "^1.0" serde_repr = "0.1.16" +serde_with = "3.4.0" tempfile = "3.8" tokio = { version = "1", features = ["macros"] } typed-builder = "^0.18" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 6d2443ccd..b4867bbe4 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -53,6 +53,7 @@ serde_bytes = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } serde_repr = { workspace = true } +serde_with = { workspace = true } typed-builder = { workspace = true } url = { workspace = true } urlencoding = { workspace = true } diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs new file mode 100644 index 000000000..bc12edff7 --- /dev/null +++ b/crates/iceberg/src/spec/manifest.rs @@ -0,0 +1,1821 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Manifest for Iceberg. +use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; + +use super::{ + FieldSummary, FormatVersion, ManifestContentType, ManifestListEntry, PartitionSpec, Schema, + Struct, +}; +use super::{Literal, UNASSIGNED_SEQUENCE_NUMBER}; +use crate::io::OutputFile; +use crate::spec::PartitionField; +use crate::{Error, ErrorKind}; +use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter}; +use futures::AsyncWriteExt; +use serde_json::to_vec; +use std::cmp::min; +use std::collections::HashMap; +use std::str::FromStr; + +/// A manifest contains metadata and a list of entries. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct Manifest { + metadata: ManifestMetadata, + entries: Vec, +} + +impl Manifest { + /// Parse manifest from bytes of avro file. + pub fn parse_avro(bs: &[u8]) -> Result { + let reader = AvroReader::new(bs)?; + + // Parse manifest metadata + let meta = reader.user_metadata(); + let metadata = ManifestMetadata::parse(meta)?; + + // Parse manifest entries + let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?; + + let entries = match metadata.format_version { + FormatVersion::V1 => { + let schema = manifest_schema_v1(partition_type.clone())?; + let reader = AvroReader::with_schema(&schema, bs)?; + reader + .into_iter() + .map(|value| { + from_value::<_serde::ManifestEntryV1>(&value?)? + .try_into(&partition_type, &metadata.schema) + }) + .collect::, Error>>()? + } + FormatVersion::V2 => { + let schema = manifest_schema_v2(partition_type.clone())?; + let reader = AvroReader::with_schema(&schema, bs)?; + reader + .into_iter() + .map(|value| { + from_value::<_serde::ManifestEntryV2>(&value?)? + .try_into(&partition_type, &metadata.schema) + }) + .collect::, Error>>()? + } + }; + + Ok(Manifest { metadata, entries }) + } +} + +/// A manifest writer. +pub struct ManifestWriter { + output: OutputFile, + + snapshot_id: i64, + + added_files: u32, + added_rows: u64, + existing_files: u32, + existing_rows: u64, + deleted_files: u32, + deleted_rows: u64, + + min_seq_num: Option, + + key_metadata: Vec, + + field_summary: HashMap, +} + +impl ManifestWriter { + /// Create a new manifest writer. + pub fn new(output: OutputFile, snapshot_id: i64, key_metadata: Vec) -> Self { + Self { + output, + snapshot_id, + added_files: 0, + added_rows: 0, + existing_files: 0, + existing_rows: 0, + deleted_files: 0, + deleted_rows: 0, + min_seq_num: None, + key_metadata, + field_summary: HashMap::new(), + } + } + + fn update_field_summary(&mut self, entry: &ManifestEntry) { + // Update field summary + for (&k, &v) in &entry.data_file.null_value_counts { + let field_summary = self.field_summary.entry(k).or_default(); + if v > 0 { + field_summary.contains_null = true; + } + } + + for (&k, &v) in &entry.data_file.nan_value_counts { + let field_summary = self.field_summary.entry(k).or_default(); + if v > 0 { + field_summary.contains_nan = Some(true); + } + if v == 0 { + field_summary.contains_nan = Some(false); + } + } + + for (&k, v) in &entry.data_file.lower_bounds { + let field_summary = self.field_summary.entry(k).or_default(); + if let Some(cur) = &field_summary.lower_bound { + if v < cur { + field_summary.lower_bound = Some(v.clone()); + } + } else { + field_summary.lower_bound = Some(v.clone()); + } + } + + for (&k, v) in &entry.data_file.upper_bounds { + let field_summary = self.field_summary.entry(k).or_default(); + if let Some(cur) = &field_summary.upper_bound { + if v > cur { + field_summary.upper_bound = Some(v.clone()); + } + } else { + field_summary.upper_bound = Some(v.clone()); + } + } + } + + fn get_field_summary_vec(&mut self, spec_fields: &[PartitionField]) -> Vec { + let mut partition_summary = Vec::with_capacity(self.field_summary.len()); + for field in spec_fields { + let entry = self + .field_summary + .remove(&field.source_id) + .unwrap_or(FieldSummary::default()); + partition_summary.push(entry); + } + partition_summary + } + + /// Write a manifest entry. + pub async fn write(mut self, manifest: Manifest) -> Result { + // Create the avro writer + let partition_type = manifest + .metadata + .partition_spec + .partition_type(&manifest.metadata.schema)?; + let table_schema = &manifest.metadata.schema; + let avro_schema = match manifest.metadata.format_version { + FormatVersion::V1 => manifest_schema_v1(partition_type.clone())?, + FormatVersion::V2 => manifest_schema_v2(partition_type.clone())?, + }; + let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new()); + avro_writer.add_user_metadata( + "schema".to_string(), + to_vec(table_schema).map_err(|err| { + Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema") + .with_source(err) + })?, + )?; + avro_writer.add_user_metadata( + "schema-id".to_string(), + table_schema.schema_id().to_string(), + )?; + avro_writer.add_user_metadata( + "partition-spec".to_string(), + to_vec(&manifest.metadata.partition_spec.fields).map_err(|err| { + Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec") + .with_source(err) + })?, + )?; + avro_writer.add_user_metadata( + "partition-spec-id".to_string(), + manifest.metadata.partition_spec.spec_id.to_string(), + )?; + avro_writer.add_user_metadata( + "format-version".to_string(), + (manifest.metadata.format_version as u8).to_string(), + )?; + if manifest.metadata.format_version == FormatVersion::V2 { + avro_writer + .add_user_metadata("content".to_string(), manifest.metadata.content.to_string())?; + } + + // Write manifest entries + for entry in manifest.entries { + if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing) + && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none()) + { + return Err(Error::new( + ErrorKind::DataInvalid, + "Manifest entry with status Existing or Deleted should have sequence number", + )); + } + + match entry.status { + ManifestStatus::Added => { + self.added_files += 1; + self.added_rows += entry.data_file.record_count; + } + ManifestStatus::Deleted => { + self.deleted_files += 1; + self.deleted_rows += entry.data_file.record_count; + } + ManifestStatus::Existing => { + self.existing_files += 1; + self.existing_rows += entry.data_file.record_count; + } + } + + if entry.is_alive() { + if let Some(seq_num) = entry.sequence_number { + self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num))); + } + } + + self.update_field_summary(&entry); + + let value = match manifest.metadata.format_version { + FormatVersion::V1 => { + to_value(_serde::ManifestEntryV1::try_from(entry, &partition_type)?)? + .resolve(&avro_schema)? + } + FormatVersion::V2 => { + to_value(_serde::ManifestEntryV2::try_from(entry, &partition_type)?)? + .resolve(&avro_schema)? + } + }; + + avro_writer.append(value)?; + } + + let length = avro_writer.flush()?; + let content = avro_writer.into_inner()?; + let mut writer = self.output.writer().await?; + writer.write_all(&content).await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "Fail to write Manifest Entry").with_source(err) + })?; + writer.close().await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "Fail to write Manifest Entry").with_source(err) + })?; + + let partition_summary = + self.get_field_summary_vec(&manifest.metadata.partition_spec.fields); + + Ok(ManifestListEntry { + manifest_path: self.output.location().to_string(), + manifest_length: length as i64, + partition_spec_id: manifest.metadata.partition_spec.spec_id, + content: manifest.metadata.content, + // sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with + // real sequence number in `ManifestListWriter`. + sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER), + added_snapshot_id: self.snapshot_id, + added_data_files_count: Some(self.added_files), + existing_data_files_count: Some(self.existing_files), + deleted_data_files_count: Some(self.deleted_files), + added_rows_count: Some(self.added_rows), + existing_rows_count: Some(self.existing_rows), + deleted_rows_count: Some(self.deleted_rows), + partitions: partition_summary, + key_metadata: self.key_metadata, + }) + } +} + +/// This is a helper module that defines the schema field of the manifest list entry. +mod _const_schema { + use std::sync::Arc; + + use apache_avro::Schema as AvroSchema; + use once_cell::sync::Lazy; + + use crate::{ + avro::schema_to_avro_schema, + spec::{ + ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, + }, + Error, + }; + + static STATUS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 0, + "status", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + + static SNAPSHOT_ID_V1: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 1, + "snapshot_id", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static SNAPSHOT_ID_V2: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 1, + "snapshot_id", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static SEQUENCE_NUMBER: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 3, + "sequence_number", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static FILE_SEQUENCE_NUMBER: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 4, + "file_sequence_number", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static CONTENT: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 134, + "content", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + + static FILE_PATH: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 100, + "file_path", + Type::Primitive(PrimitiveType::String), + )) + }) + }; + + static FILE_FORMAT: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 101, + "file_format", + Type::Primitive(PrimitiveType::String), + )) + }) + }; + + static RECORD_COUNT: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 103, + "record_count", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static FILE_SIZE_IN_BYTES: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 104, + "file_size_in_bytes", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + // Deprecated. Always write a default in v1. Do not write in v2. + static BLOCK_SIZE_IN_BYTES: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 105, + "block_size_in_bytes", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + static COLUMN_SIZES: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 108, + "column_sizes", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 117, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 118, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static VALUE_COUNTS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 109, + "value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 119, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 120, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static NULL_VALUE_COUNTS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 110, + "null_value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 121, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 122, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static NAN_VALUE_COUNTS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 137, + "nan_value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 138, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 139, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static LOWER_BOUNDS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 125, + "lower_bounds", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 126, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 127, + "value", + Type::Primitive(PrimitiveType::Binary), + )), + }), + )) + }) + }; + + static UPPER_BOUNDS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 128, + "upper_bounds", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 129, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 130, + "value", + Type::Primitive(PrimitiveType::Binary), + )), + }), + )) + }) + }; + + static KEY_METADATA: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 131, + "key_metadata", + Type::Primitive(PrimitiveType::Binary), + )) + }) + }; + + static SPLIT_OFFSETS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 132, + "split_offsets", + Type::List(ListType { + element_field: Arc::new(NestedField::required( + 133, + "element", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + static EQUALITY_IDS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 135, + "equality_ids", + Type::List(ListType { + element_field: Arc::new(NestedField::required( + 136, + "element", + Type::Primitive(PrimitiveType::Int), + )), + }), + )) + }) + }; + + static SORT_ORDER_ID: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 140, + "sort_order_id", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + + pub(super) fn manifest_schema_v2(partition_type: StructType) -> Result { + let fields = vec![ + STATUS.clone(), + SNAPSHOT_ID_V2.clone(), + SEQUENCE_NUMBER.clone(), + FILE_SEQUENCE_NUMBER.clone(), + Arc::new(NestedField::required( + 2, + "data_file", + Type::Struct(StructType::new(vec![ + CONTENT.clone(), + FILE_PATH.clone(), + FILE_FORMAT.clone(), + Arc::new(NestedField::required( + 102, + "partition", + Type::Struct(partition_type), + )), + RECORD_COUNT.clone(), + FILE_SIZE_IN_BYTES.clone(), + COLUMN_SIZES.clone(), + VALUE_COUNTS.clone(), + NULL_VALUE_COUNTS.clone(), + NAN_VALUE_COUNTS.clone(), + LOWER_BOUNDS.clone(), + UPPER_BOUNDS.clone(), + KEY_METADATA.clone(), + SPLIT_OFFSETS.clone(), + EQUALITY_IDS.clone(), + SORT_ORDER_ID.clone(), + ])), + )), + ]; + let schema = Schema::builder().with_fields(fields).build().unwrap(); + schema_to_avro_schema("manifest", &schema) + } + + pub(super) fn manifest_schema_v1(partition_type: StructType) -> Result { + let fields = vec![ + STATUS.clone(), + SNAPSHOT_ID_V1.clone(), + Arc::new(NestedField::required( + 2, + "data_file", + Type::Struct(StructType::new(vec![ + FILE_PATH.clone(), + FILE_FORMAT.clone(), + Arc::new(NestedField::required( + 102, + "partition", + Type::Struct(partition_type), + )), + RECORD_COUNT.clone(), + FILE_SIZE_IN_BYTES.clone(), + BLOCK_SIZE_IN_BYTES.clone(), + COLUMN_SIZES.clone(), + VALUE_COUNTS.clone(), + NULL_VALUE_COUNTS.clone(), + NAN_VALUE_COUNTS.clone(), + LOWER_BOUNDS.clone(), + UPPER_BOUNDS.clone(), + KEY_METADATA.clone(), + SPLIT_OFFSETS.clone(), + SORT_ORDER_ID.clone(), + ])), + )), + ]; + let schema = Schema::builder().with_fields(fields).build().unwrap(); + schema_to_avro_schema("manifest", &schema) + } +} + +/// Meta data of a manifest that is stored in the key-value metadata of the Avro file +#[derive(Debug, PartialEq, Clone, Eq)] +pub struct ManifestMetadata { + /// The table schema at the time the manifest + /// was written + schema: Schema, + /// ID of the schema used to write the manifest as a string + schema_id: i32, + /// The partition spec used to write the manifest + partition_spec: PartitionSpec, + /// Table format version number of the manifest as a string + format_version: FormatVersion, + /// Type of content files tracked by the manifest: “data” or “deletes” + content: ManifestContentType, +} + +impl ManifestMetadata { + /// Parse from metadata in avro file. + pub fn parse(meta: &HashMap>) -> Result { + let schema = { + let bs = meta.get("schema").ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "schema is required in manifest metadata but not found", + ) + })?; + serde_json::from_slice::(bs).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse schema in manifest metadata", + ) + .with_source(err) + })? + }; + let schema_id: i32 = meta + .get("schema-id") + .map(|bs| { + String::from_utf8_lossy(bs).parse().map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse schema id in manifest metadata", + ) + .with_source(err) + }) + }) + .transpose()? + .unwrap_or(0); + let partition_spec = { + let fields = { + let bs = meta.get("partition-spec").ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "partition-spec is required in manifest metadata but not found", + ) + })?; + serde_json::from_slice::>(bs).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse partition spec in manifest metadata", + ) + .with_source(err) + })? + }; + let spec_id = meta + .get("partition-spec-id") + .map(|bs| { + String::from_utf8_lossy(bs).parse().map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse partition spec id in manifest metadata", + ) + .with_source(err) + }) + }) + .transpose()? + .unwrap_or(0); + PartitionSpec { spec_id, fields } + }; + let format_version = if let Some(bs) = meta.get("format-version") { + serde_json::from_slice::(bs).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse format version in manifest metadata", + ) + .with_source(err) + })? + } else { + FormatVersion::V1 + }; + let content = if let Some(v) = meta.get("content") { + let v = String::from_utf8_lossy(v); + v.parse()? + } else { + ManifestContentType::Data + }; + Ok(ManifestMetadata { + schema, + schema_id, + partition_spec, + format_version, + content, + }) + } +} + +/// A manifest is an immutable Avro file that lists data files or delete +/// files, along with each file’s partition data tuple, metrics, and tracking +/// information. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct ManifestEntry { + /// field: 0 + /// + /// Used to track additions and deletions. + status: ManifestStatus, + /// field id: 1 + /// + /// Snapshot id where the file was added, or deleted if status is 2. + /// Inherited when null. + snapshot_id: Option, + /// field id: 3 + /// + /// Data sequence number of the file. + /// Inherited when null and status is 1 (added). + sequence_number: Option, + /// field id: 4 + /// + /// File sequence number indicating when the file was added. + /// Inherited when null and status is 1 (added). + file_sequence_number: Option, + /// field id: 2 + /// + /// File path, partition tuple, metrics, … + data_file: DataFile, +} + +impl ManifestEntry { + /// Check if this manifest entry is deleted. + pub fn is_alive(&self) -> bool { + matches!( + self.status, + ManifestStatus::Added | ManifestStatus::Existing + ) + } +} + +/// Used to track additions and deletions in ManifestEntry. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum ManifestStatus { + /// Value: 0 + Existing = 0, + /// Value: 1 + Added = 1, + /// Value: 2 + /// + /// Deletes are informational only and not used in scans. + Deleted = 2, +} + +impl TryFrom for ManifestStatus { + type Error = Error; + + fn try_from(v: i32) -> Result { + match v { + 0 => Ok(ManifestStatus::Existing), + 1 => Ok(ManifestStatus::Added), + 2 => Ok(ManifestStatus::Deleted), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("manifest status {} is invalid", v), + )), + } + } +} + +/// Data file carries data file path, partition tuple, metrics, … +#[derive(Debug, PartialEq, Clone, Eq)] +pub struct DataFile { + /// field id: 134 + /// + /// Type of content stored by the data file: data, equality deletes, + /// or position deletes (all v1 files are data files) + content: DataContentType, + /// field id: 100 + /// + /// Full URI for the file with FS scheme + file_path: String, + /// field id: 101 + /// + /// String file format name, avro, orc or parquet + file_format: DataFileFormat, + /// field id: 102 + /// + /// Partition data tuple, schema based on the partition spec output using + /// partition field ids for the struct field ids + partition: Struct, + /// field id: 103 + /// + /// Number of records in this file + record_count: u64, + /// field id: 104 + /// + /// Total file size in bytes + file_size_in_bytes: u64, + /// field id: 108 + /// key field id: 117 + /// value field id: 118 + /// + /// Map from column id to the total size on disk of all regions that + /// store the column. Does not include bytes necessary to read other + /// columns, like footers. Leave null for row-oriented formats (Avro) + column_sizes: HashMap, + /// field id: 109 + /// key field id: 119 + /// value field id: 120 + /// + /// Map from column id to number of values in the column (including null + /// and NaN values) + value_counts: HashMap, + /// field id: 110 + /// key field id: 121 + /// value field id: 122 + /// + /// Map from column id to number of null values in the column + null_value_counts: HashMap, + /// field id: 137 + /// key field id: 138 + /// value field id: 139 + /// + /// Map from column id to number of NaN values in the column + nan_value_counts: HashMap, + /// field id: 125 + /// key field id: 126 + /// value field id: 127 + /// + /// Map from column id to lower bound in the column serialized as binary. + /// Each value must be less than or equal to all non-null, non-NaN values + /// in the column for the file. + /// + /// Reference: + /// + /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) + lower_bounds: HashMap, + /// field id: 128 + /// key field id: 129 + /// value field id: 130 + /// + /// Map from column id to upper bound in the column serialized as binary. + /// Each value must be greater than or equal to all non-null, non-Nan + /// values in the column for the file. + /// + /// Reference: + /// + /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) + upper_bounds: HashMap, + /// field id: 131 + /// + /// Implementation-specific key metadata for encryption + key_metadata: Vec, + /// field id: 132 + /// element field id: 133 + /// + /// Split offsets for the data file. For example, all row group offsets + /// in a Parquet file. Must be sorted ascending + split_offsets: Vec, + /// field id: 135 + /// element field id: 136 + /// + /// Field ids used to determine row equality in equality delete files. + /// Required when content is EqualityDeletes and should be null + /// otherwise. Fields with ids listed in this column must be present + /// in the delete file + equality_ids: Vec, + /// field id: 140 + /// + /// ID representing sort order for this file. + /// + /// If sort order ID is missing or unknown, then the order is assumed to + /// be unsorted. Only data files and equality delete files should be + /// written with a non-null order id. Position deletes are required to be + /// sorted by file and position, not a table order, and should set sort + /// order id to null. Readers must ignore sort order id for position + /// delete files. + sort_order_id: Option, +} + +/// Type of content stored by the data file: data, equality deletes, or +/// position deletes (all v1 files are data files) +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum DataContentType { + /// value: 0 + Data = 0, + /// value: 1 + PositionDeletes = 1, + /// value: 2 + EqualityDeletes = 2, +} + +impl TryFrom for DataContentType { + type Error = Error; + + fn try_from(v: i32) -> Result { + match v { + 0 => Ok(DataContentType::Data), + 1 => Ok(DataContentType::PositionDeletes), + 2 => Ok(DataContentType::EqualityDeletes), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("data content type {} is invalid", v), + )), + } + } +} + +/// Format of this data. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum DataFileFormat { + /// Avro file format: + Avro, + /// Orc file format: + Orc, + /// Parquet file format: + Parquet, +} + +impl FromStr for DataFileFormat { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "avro" => Ok(Self::Avro), + "orc" => Ok(Self::Orc), + "parquet" => Ok(Self::Parquet), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported data file format: {}", s), + )), + } + } +} + +impl ToString for DataFileFormat { + fn to_string(&self) -> String { + match self { + DataFileFormat::Avro => "avro", + DataFileFormat::Orc => "orc", + DataFileFormat::Parquet => "parquet", + } + .to_string() + } +} + +mod _serde { + use std::collections::HashMap; + + use serde_bytes::ByteBuf; + use serde_derive::{Deserialize, Serialize}; + use serde_with::serde_as; + + use crate::spec::Literal; + use crate::spec::RawLiteral; + use crate::spec::Schema; + use crate::spec::Struct; + use crate::spec::StructType; + use crate::spec::Type; + use crate::Error; + use crate::ErrorKind; + + use super::ManifestEntry; + + #[derive(Serialize, Deserialize)] + pub(super) struct ManifestEntryV2 { + status: i32, + snapshot_id: Option, + sequence_number: Option, + file_sequence_number: Option, + data_file: DataFile, + } + + impl ManifestEntryV2 { + pub fn try_from(value: ManifestEntry, partition_type: &StructType) -> Result { + Ok(Self { + status: value.status as i32, + snapshot_id: value.snapshot_id, + sequence_number: value.sequence_number, + file_sequence_number: value.file_sequence_number, + data_file: DataFile::try_from(value.data_file, partition_type, false)?, + }) + } + + pub fn try_into( + self, + partition_type: &StructType, + schema: &Schema, + ) -> Result { + Ok(ManifestEntry { + status: self.status.try_into()?, + snapshot_id: self.snapshot_id, + sequence_number: self.sequence_number, + file_sequence_number: self.file_sequence_number, + data_file: self.data_file.try_into(partition_type, schema)?, + }) + } + } + + #[derive(Serialize, Deserialize)] + pub(super) struct ManifestEntryV1 { + status: i32, + pub snapshot_id: i64, + data_file: DataFile, + } + + impl ManifestEntryV1 { + pub fn try_from(value: ManifestEntry, partition_type: &StructType) -> Result { + Ok(Self { + status: value.status as i32, + snapshot_id: value.snapshot_id.unwrap_or_default(), + data_file: DataFile::try_from(value.data_file, partition_type, true)?, + }) + } + + pub fn try_into( + self, + partition_type: &StructType, + schema: &Schema, + ) -> Result { + Ok(ManifestEntry { + status: self.status.try_into()?, + snapshot_id: Some(self.snapshot_id), + sequence_number: None, + file_sequence_number: None, + data_file: self.data_file.try_into(partition_type, schema)?, + }) + } + } + + #[serde_as] + #[derive(Serialize, Deserialize)] + pub(super) struct DataFile { + #[serde(default)] + content: i32, + file_path: String, + file_format: String, + partition: RawLiteral, + record_count: i64, + file_size_in_bytes: i64, + #[serde(skip_deserializing, skip_serializing_if = "Option::is_none")] + block_size_in_bytes: Option, + column_sizes: Option>, + value_counts: Option>, + null_value_counts: Option>, + nan_value_counts: Option>, + lower_bounds: Option>, + upper_bounds: Option>, + key_metadata: Option, + split_offsets: Option>, + #[serde(default)] + equality_ids: Option>, + sort_order_id: Option, + } + + impl DataFile { + pub fn try_from( + value: super::DataFile, + partition_type: &StructType, + is_version_1: bool, + ) -> Result { + let block_size_in_bytes = if is_version_1 { Some(0) } else { None }; + Ok(Self { + content: value.content as i32, + file_path: value.file_path, + file_format: value.file_format.to_string(), + partition: RawLiteral::try_from( + Literal::Struct(value.partition), + &Type::Struct(partition_type.clone()), + )?, + record_count: value.record_count.try_into()?, + file_size_in_bytes: value.file_size_in_bytes.try_into()?, + block_size_in_bytes, + column_sizes: Some(to_i64_entry(value.column_sizes)?), + value_counts: Some(to_i64_entry(value.value_counts)?), + null_value_counts: Some(to_i64_entry(value.null_value_counts)?), + nan_value_counts: Some(to_i64_entry(value.nan_value_counts)?), + lower_bounds: Some(to_bytes_entry(value.lower_bounds)), + upper_bounds: Some(to_bytes_entry(value.upper_bounds)), + key_metadata: Some(serde_bytes::ByteBuf::from(value.key_metadata)), + split_offsets: Some(value.split_offsets), + equality_ids: Some(value.equality_ids), + sort_order_id: value.sort_order_id, + }) + } + pub fn try_into( + self, + partition_type: &StructType, + schema: &Schema, + ) -> Result { + let partition = self + .partition + .try_into(&Type::Struct(partition_type.clone()))? + .map(|v| { + if let Literal::Struct(v) = v { + Ok(v) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + "partition value is not a struct", + )) + } + }) + .transpose()? + .unwrap_or(Struct::empty()); + Ok(super::DataFile { + content: self.content.try_into()?, + file_path: self.file_path, + file_format: self.file_format.parse()?, + partition, + record_count: self.record_count.try_into()?, + file_size_in_bytes: self.file_size_in_bytes.try_into()?, + column_sizes: self + .column_sizes + .map(parse_i64_entry) + .transpose()? + .unwrap_or_default(), + value_counts: self + .value_counts + .map(parse_i64_entry) + .transpose()? + .unwrap_or_default(), + null_value_counts: self + .null_value_counts + .map(parse_i64_entry) + .transpose()? + .unwrap_or_default(), + nan_value_counts: self + .nan_value_counts + .map(parse_i64_entry) + .transpose()? + .unwrap_or_default(), + lower_bounds: self + .lower_bounds + .map(|v| parse_bytes_entry(v, schema)) + .transpose()? + .unwrap_or_default(), + upper_bounds: self + .upper_bounds + .map(|v| parse_bytes_entry(v, schema)) + .transpose()? + .unwrap_or_default(), + key_metadata: self.key_metadata.map(|v| v.to_vec()).unwrap_or_default(), + split_offsets: self.split_offsets.unwrap_or_default(), + equality_ids: self.equality_ids.unwrap_or_default(), + sort_order_id: self.sort_order_id, + }) + } + } + + #[serde_as] + #[derive(Serialize, Deserialize)] + #[cfg_attr(test, derive(Debug, PartialEq, Eq))] + struct BytesEntry { + key: i32, + value: serde_bytes::ByteBuf, + } + + fn parse_bytes_entry( + v: Vec, + schema: &Schema, + ) -> Result, Error> { + let mut m = HashMap::with_capacity(v.len()); + for entry in v { + let data_type = &schema + .field_by_id(entry.key) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Can't find field id {} for upper/lower_bounds", entry.key), + ) + })? + .field_type; + m.insert(entry.key, Literal::try_from_bytes(&entry.value, data_type)?); + } + Ok(m) + } + + fn to_bytes_entry(v: HashMap) -> Vec { + v.into_iter() + .map(|e| BytesEntry { + key: e.0, + value: Into::::into(e.1), + }) + .collect() + } + + #[derive(Serialize, Deserialize)] + #[cfg_attr(test, derive(Debug, PartialEq, Eq))] + struct I64Entry { + key: i32, + value: i64, + } + + fn parse_i64_entry(v: Vec) -> Result, Error> { + let mut m = HashMap::with_capacity(v.len()); + for entry in v { + m.insert(entry.key, entry.value.try_into()?); + } + Ok(m) + } + + fn to_i64_entry(entries: HashMap) -> Result, Error> { + entries + .iter() + .map(|e| { + Ok(I64Entry { + key: *e.0, + value: (*e.1).try_into()?, + }) + }) + .collect() + } +} + +#[cfg(test)] +mod tests { + use std::fs; + + use tempfile::TempDir; + + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::NestedField; + use crate::spec::PrimitiveType; + use crate::spec::Struct; + use crate::spec::Transform; + use crate::spec::Type; + use std::sync::Arc; + + #[test] + fn test_parse_manifest_v2_unpartition() { + let path = format!( + "{}/testdata/unpartition_manifest_v2.avro", + env!("CARGO_MANIFEST_DIR") + ); + let bs = fs::read(path).expect("read_file must succeed"); + let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + // test metadata + assert!(manifest.metadata.schema_id == 0); + assert_eq!(manifest.metadata.schema, { + let fields = vec![ + // id v_int v_long v_float v_double v_varchar v_bool v_date v_timestamp v_decimal v_ts_ntz + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "v_int", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 3, + "v_long", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 4, + "v_float", + Type::Primitive(PrimitiveType::Float), + )), + Arc::new(NestedField::optional( + 5, + "v_double", + Type::Primitive(PrimitiveType::Double), + )), + Arc::new(NestedField::optional( + 6, + "v_varchar", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 7, + "v_bool", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 8, + "v_date", + Type::Primitive(PrimitiveType::Date), + )), + Arc::new(NestedField::optional( + 9, + "v_timestamp", + Type::Primitive(PrimitiveType::Timestamptz), + )), + Arc::new(NestedField::optional( + 10, + "v_decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 36, + scale: 10, + }), + )), + Arc::new(NestedField::optional( + 11, + "v_ts_ntz", + Type::Primitive(PrimitiveType::Timestamp), + )), + ]; + Schema::builder().with_fields(fields).build().unwrap() + }); + assert!(manifest.metadata.partition_spec.fields.is_empty()); + assert!(manifest.metadata.content == ManifestContentType::Data); + assert!(manifest.metadata.format_version == FormatVersion::V2); + // test entries + assert!(manifest.entries.len() == 1); + let entry = &manifest.entries[0]; + assert!(entry.status == ManifestStatus::Added); + assert!(entry.snapshot_id == Some(0)); + assert!(entry.sequence_number == Some(1)); + assert!(entry.file_sequence_number == Some(1)); + assert_eq!( + entry.data_file, + DataFile { + content: DataContentType::Data, + file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count: 1, + file_size_in_bytes: 5442, + column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]), + value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]), + null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: Vec::new(), + split_offsets: vec![4], + equality_ids: Vec::new(), + sort_order_id: None, + } + ); + } + + #[test] + fn test_parse_manifest_v2_partition() { + let path = format!( + "{}/testdata/partition_manifest_v2.avro", + env!("CARGO_MANIFEST_DIR") + ); + let bs = fs::read(path).expect("read_file must succeed"); + let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + assert_eq!(manifest.metadata.schema_id, 0); + assert_eq!(manifest.metadata.schema, { + let fields = vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "v_int", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 3, + "v_long", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 4, + "v_float", + Type::Primitive(PrimitiveType::Float), + )), + Arc::new(NestedField::optional( + 5, + "v_double", + Type::Primitive(PrimitiveType::Double), + )), + Arc::new(NestedField::optional( + 6, + "v_varchar", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 7, + "v_bool", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 8, + "v_date", + Type::Primitive(PrimitiveType::Date), + )), + Arc::new(NestedField::optional( + 9, + "v_timestamp", + Type::Primitive(PrimitiveType::Timestamptz), + )), + Arc::new(NestedField::optional( + 10, + "v_decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 36, + scale: 10, + }), + )), + Arc::new(NestedField::optional( + 11, + "v_ts_ntz", + Type::Primitive(PrimitiveType::Timestamp), + )), + ]; + Schema::builder().with_fields(fields).build().unwrap() + }); + assert_eq!(manifest.metadata.partition_spec, { + let fields = vec![ + PartitionField { + name: "v_int".to_string(), + transform: Transform::Identity, + source_id: 2, + field_id: 1000, + }, + PartitionField { + name: "v_long".to_string(), + transform: Transform::Identity, + source_id: 3, + field_id: 1001, + }, + ]; + PartitionSpec { spec_id: 0, fields } + }); + assert!(manifest.metadata.content == ManifestContentType::Data); + assert!(manifest.metadata.format_version == FormatVersion::V2); + assert_eq!(manifest.entries.len(), 1); + let entry = &manifest.entries[0]; + assert_eq!(entry.status, ManifestStatus::Added); + assert_eq!(entry.snapshot_id, Some(0)); + assert_eq!(entry.sequence_number, Some(1)); + assert_eq!(entry.file_sequence_number, Some(1)); + assert_eq!(entry.data_file.content, DataContentType::Data); + assert_eq!( + entry.data_file.file_path, + "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet" + ); + assert_eq!(entry.data_file.file_format, DataFileFormat::Parquet); + assert_eq!( + entry.data_file.partition, + Struct::from_iter( + vec![ + (1000, Some(Literal::int(1)), "v_int".to_string()), + (1001, Some(Literal::long(1000)), "v_long".to_string()) + ] + .into_iter() + ) + ); + assert_eq!(entry.data_file.record_count, 1); + assert_eq!(entry.data_file.file_size_in_bytes, 5442); + assert_eq!( + entry.data_file.column_sizes, + HashMap::from([ + (0, 73), + (6, 34), + (2, 73), + (7, 61), + (3, 61), + (5, 62), + (9, 79), + (10, 73), + (1, 61), + (4, 73), + (8, 73) + ]) + ); + assert_eq!( + entry.data_file.value_counts, + HashMap::from([ + (4, 1), + (5, 1), + (2, 1), + (0, 1), + (3, 1), + (6, 1), + (8, 1), + (1, 1), + (10, 1), + (7, 1), + (9, 1) + ]) + ); + assert_eq!( + entry.data_file.null_value_counts, + HashMap::from([ + (1, 0), + (6, 0), + (2, 0), + (8, 0), + (0, 0), + (3, 0), + (5, 0), + (9, 0), + (7, 0), + (4, 0), + (10, 0) + ]) + ); + assert!(entry.data_file.nan_value_counts.is_empty()); + assert!(entry.data_file.lower_bounds.is_empty()); + assert!(entry.data_file.upper_bounds.is_empty()); + assert!(entry.data_file.key_metadata.is_empty()); + assert_eq!(entry.data_file.split_offsets, vec![4]); + assert!(entry.data_file.equality_ids.is_empty()); + assert_eq!(entry.data_file.sort_order_id, None); + } + + #[test] + fn test_parse_manifest_v1_unpartition() { + let path = format!( + "{}/testdata/unpartition_manifest_v1.avro", + env!("CARGO_MANIFEST_DIR") + ); + let bs = fs::read(path).expect("read_file must succeed"); + let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + // test metadata + assert!(manifest.metadata.schema_id == 0); + assert_eq!(manifest.metadata.schema, { + let fields = vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "data", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 3, + "comment", + Type::Primitive(PrimitiveType::String), + )), + ]; + Schema::builder() + .with_schema_id(1) + .with_fields(fields) + .build() + .unwrap() + }); + assert!(manifest.metadata.partition_spec.fields.is_empty()); + assert!(manifest.metadata.content == ManifestContentType::Data); + assert!(manifest.metadata.format_version == FormatVersion::V1); + assert_eq!(manifest.entries.len(), 4); + let entry = &manifest.entries[0]; + assert!(entry.status == ManifestStatus::Added); + assert!(entry.snapshot_id == Some(2966623707104393227)); + assert!(entry.sequence_number.is_none()); + assert!(entry.file_sequence_number.is_none()); + assert_eq!( + entry.data_file, + DataFile { + content: DataContentType::Data, + file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count: 1, + file_size_in_bytes: 875, + column_sizes: HashMap::from([(1,47),(2,48),(3,52)]), + value_counts: HashMap::from([(1,1),(2,1),(3,1)]), + null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::from([(1,Literal::int(1)),(2,Literal::string("a")),(3,Literal::string("AC/DC"))]), + upper_bounds: HashMap::from([(1,Literal::int(1)),(2,Literal::string("a")),(3,Literal::string("AC/DC"))]), + key_metadata: vec![], + split_offsets: vec![4], + equality_ids: vec![], + sort_order_id: Some(0), + } + ); + } + + #[test] + fn test_parse_manifest_v1_partition() { + let path = format!( + "{}/testdata/partition_manifest_v1.avro", + env!("CARGO_MANIFEST_DIR") + ); + let bs = fs::read(path).expect("read_file must succeed"); + let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + // test metadata + assert!(manifest.metadata.schema_id == 0); + assert_eq!(manifest.metadata.schema, { + let fields = vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "data", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 3, + "category", + Type::Primitive(PrimitiveType::String), + )), + ]; + Schema::builder().with_fields(fields).build().unwrap() + }); + assert_eq!(manifest.metadata.partition_spec, { + let fields = vec![PartitionField { + name: "category".to_string(), + transform: Transform::Identity, + source_id: 3, + field_id: 1000, + }]; + PartitionSpec { spec_id: 0, fields } + }); + assert!(manifest.metadata.content == ManifestContentType::Data); + assert!(manifest.metadata.format_version == FormatVersion::V1); + + // test entries + assert!(manifest.entries.len() == 1); + let entry = &manifest.entries[0]; + assert!(entry.status == ManifestStatus::Added); + assert!(entry.snapshot_id == Some(8205833995881562618)); + assert!(entry.sequence_number.is_none()); + assert!(entry.file_sequence_number.is_none()); + assert_eq!(entry.data_file.content, DataContentType::Data); + assert_eq!( + entry.data_file.file_path, + "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet" + ); + assert_eq!(entry.data_file.file_format, DataFileFormat::Parquet); + assert_eq!( + entry.data_file.partition, + Struct::from_iter( + vec![( + 1000, + Some( + Literal::try_from_bytes(&[120], &Type::Primitive(PrimitiveType::String)) + .unwrap() + ), + "category".to_string() + )] + .into_iter() + ) + ); + assert_eq!(entry.data_file.record_count, 1); + assert_eq!(entry.data_file.file_size_in_bytes, 874); + assert_eq!( + entry.data_file.column_sizes, + HashMap::from([(1, 46), (2, 48), (3, 48)]) + ); + assert_eq!( + entry.data_file.value_counts, + HashMap::from([(1, 1), (2, 1), (3, 1)]) + ); + assert_eq!( + entry.data_file.null_value_counts, + HashMap::from([(1, 0), (2, 0), (3, 0)]) + ); + assert_eq!(entry.data_file.nan_value_counts, HashMap::new()); + assert_eq!( + entry.data_file.lower_bounds, + HashMap::from([ + (1, Literal::long(1)), + (2, Literal::string("a")), + (3, Literal::string("x")) + ]) + ); + assert_eq!( + entry.data_file.upper_bounds, + HashMap::from([ + (1, Literal::long(1)), + (2, Literal::string("a")), + (3, Literal::string("x")) + ]) + ); + assert!(entry.data_file.key_metadata.is_empty()); + assert_eq!(entry.data_file.split_offsets, vec![4]); + assert!(entry.data_file.equality_ids.is_empty()); + assert_eq!(entry.data_file.sort_order_id, Some(0)); + } + + #[tokio::test] + async fn test_writer_manifest_v1_partition() { + // Read manifest + let path = format!( + "{}/testdata/partition_manifest_v1.avro", + env!("CARGO_MANIFEST_DIR") + ); + let bs = fs::read(path).expect("read_file must succeed"); + let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + + // Write manifest + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("manifest_list_v1.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + let writer = ManifestWriter::new(output_file, 1, vec![]); + let entry = writer.write(manifest.clone()).await.unwrap(); + + // Check partition summary + assert_eq!(entry.partitions.len(), 1); + assert_eq!(entry.partitions[0].lower_bound, Some(Literal::string("x"))); + assert_eq!(entry.partitions[0].upper_bound, Some(Literal::string("x"))); + + // Verify manifest + let bs = fs::read(path).expect("read_file must succeed"); + let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + + assert_eq!(actual_manifest, manifest); + } + + #[tokio::test] + async fn test_writer_manifest_v2_partition() { + // Read manifest + let path = format!( + "{}/testdata/partition_manifest_v2.avro", + env!("CARGO_MANIFEST_DIR") + ); + let bs = fs::read(path).expect("read_file must succeed"); + let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + + // Write manifest + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("manifest_list_v2.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + let writer = ManifestWriter::new(output_file, 1, vec![]); + let res = writer.write(manifest.clone()).await.unwrap(); + assert_eq!(res.sequence_number, UNASSIGNED_SEQUENCE_NUMBER); + assert_eq!(res.min_sequence_number, 1); + + // Verify manifest + let bs = fs::read(path).expect("read_file must succeed"); + let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + + assert_eq!(actual_manifest, manifest); + } +} diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index f20fd9bf1..db0c30c2c 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -17,9 +17,9 @@ //! ManifestList for Iceberg. -use std::collections::HashMap; +use std::{collections::HashMap, str::FromStr}; -use crate::{io::OutputFile, spec::Literal, Error}; +use crate::{io::OutputFile, spec::Literal, Error, ErrorKind}; use apache_avro::{from_value, types::Value, Reader, Writer}; use futures::AsyncWriteExt; @@ -30,6 +30,9 @@ use self::{ use super::{FormatVersion, StructType}; +/// Placeholder for sequence number. The field with this value must be replaced with the actual sequence number before it write. +pub const UNASSIGNED_SEQUENCE_NUMBER: i64 = -1; + /// Snapshots are embedded in table metadata, but the list of manifests for a /// snapshot are stored in a separate manifest list file. /// @@ -81,6 +84,8 @@ pub struct ManifestListWriter { format_version: FormatVersion, output_file: OutputFile, avro_writer: Writer<'static, Vec>, + sequence_number: i64, + snapshot_id: i64, } impl std::fmt::Debug for ManifestListWriter { @@ -104,7 +109,7 @@ impl ManifestListWriter { ), ("format-version".to_string(), "1".to_string()), ]); - Self::new(FormatVersion::V1, output_file, metadata) + Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id) } /// Construct a v2 [`ManifestListWriter`] that writes to a provided [`OutputFile`]. @@ -123,13 +128,21 @@ impl ManifestListWriter { ("sequence-number".to_string(), sequence_number.to_string()), ("format-version".to_string(), "2".to_string()), ]); - Self::new(FormatVersion::V2, output_file, metadata) + Self::new( + FormatVersion::V2, + output_file, + metadata, + sequence_number, + snapshot_id, + ) } fn new( format_version: FormatVersion, output_file: OutputFile, metadata: HashMap, + sequence_number: i64, + snapshot_id: i64, ) -> Self { let avro_schema = match format_version { FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1, @@ -145,6 +158,8 @@ impl ManifestListWriter { format_version, output_file, avro_writer, + sequence_number, + snapshot_id, } } @@ -156,12 +171,36 @@ impl ManifestListWriter { match self.format_version { FormatVersion::V1 => { for manifest_entry in manifest_entries { - let manifest_entry: ManifestListEntryV1 = manifest_entry.into(); + let manifest_entry: ManifestListEntryV1 = manifest_entry.try_into()?; self.avro_writer.append_ser(manifest_entry)?; } } FormatVersion::V2 => { - for manifest_entry in manifest_entries { + for mut manifest_entry in manifest_entries { + if manifest_entry.sequence_number == UNASSIGNED_SEQUENCE_NUMBER { + if manifest_entry.added_snapshot_id != self.snapshot_id { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Found unassigned sequence number for a manifest from snapshot {}.", + manifest_entry.added_snapshot_id + ), + )); + } + manifest_entry.sequence_number = self.sequence_number; + } + if manifest_entry.min_sequence_number == UNASSIGNED_SEQUENCE_NUMBER { + if manifest_entry.added_snapshot_id != self.snapshot_id { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Found unassigned sequence number for a manifest from snapshot {}.", + manifest_entry.added_snapshot_id + ), + )); + } + manifest_entry.min_sequence_number = self.sequence_number; + } let manifest_entry: ManifestListEntryV2 = manifest_entry.try_into()?; self.avro_writer.append_ser(manifest_entry)?; } @@ -466,80 +505,80 @@ pub struct ManifestListEntry { /// field: 500 /// /// Location of the manifest file - manifest_path: String, + pub manifest_path: String, /// field: 501 /// /// Length of the manifest file in bytes - manifest_length: i64, + pub manifest_length: i64, /// field: 502 /// /// ID of a partition spec used to write the manifest; must be listed /// in table metadata partition-specs - partition_spec_id: i32, + pub partition_spec_id: i32, /// field: 517 /// /// The type of files tracked by the manifest, either data or delete /// files; 0 for all v1 manifests - content: ManifestContentType, + pub content: ManifestContentType, /// field: 515 /// /// The sequence number when the manifest was added to the table; use 0 /// when reading v1 manifest lists - sequence_number: i64, + pub sequence_number: i64, /// field: 516 /// /// The minimum data sequence number of all live data or delete files in /// the manifest; use 0 when reading v1 manifest lists - min_sequence_number: i64, + pub min_sequence_number: i64, /// field: 503 /// /// ID of the snapshot where the manifest file was added - added_snapshot_id: i64, + pub added_snapshot_id: i64, /// field: 504 /// /// Number of entries in the manifest that have status ADDED, when null /// this is assumed to be non-zero - added_data_files_count: Option, + pub added_data_files_count: Option, /// field: 505 /// /// Number of entries in the manifest that have status EXISTING (0), /// when null this is assumed to be non-zero - existing_data_files_count: Option, + pub existing_data_files_count: Option, /// field: 506 /// /// Number of entries in the manifest that have status DELETED (2), /// when null this is assumed to be non-zero - deleted_data_files_count: Option, + pub deleted_data_files_count: Option, /// field: 512 /// /// Number of rows in all of files in the manifest that have status /// ADDED, when null this is assumed to be non-zero - added_rows_count: Option, + pub added_rows_count: Option, /// field: 513 /// /// Number of rows in all of files in the manifest that have status /// EXISTING, when null this is assumed to be non-zero - existing_rows_count: Option, + pub existing_rows_count: Option, /// field: 514 /// /// Number of rows in all of files in the manifest that have status /// DELETED, when null this is assumed to be non-zero - deleted_rows_count: Option, + pub deleted_rows_count: Option, /// field: 507 /// element_field: 508 /// /// A list of field summaries for each partition field in the spec. Each /// field in the list corresponds to a field in the manifest file’s /// partition spec. - partitions: Vec, + pub partitions: Vec, /// field: 519 /// /// Implementation-specific key metadata for encryption - key_metadata: Vec, + pub key_metadata: Vec, } /// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Clone, Eq)] pub enum ManifestContentType { /// The manifest content is data. Data = 0, @@ -547,6 +586,30 @@ pub enum ManifestContentType { Deletes = 1, } +impl FromStr for ManifestContentType { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "data" => Ok(ManifestContentType::Data), + "deletes" => Ok(ManifestContentType::Deletes), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid manifest content type: {s}"), + )), + } + } +} + +impl ToString for ManifestContentType { + fn to_string(&self) -> String { + match self { + ManifestContentType::Data => "data".to_string(), + ManifestContentType::Deletes => "deletes".to_string(), + } + } +} + impl TryFrom for ManifestContentType { type Error = Error; @@ -568,25 +631,25 @@ impl TryFrom for ManifestContentType { /// Field summary for partition field in the spec. /// /// Each field in the list corresponds to a field in the manifest file’s partition spec. -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Default)] pub struct FieldSummary { /// field: 509 /// /// Whether the manifest contains at least one partition with a null /// value for the field - contains_null: bool, + pub contains_null: bool, /// field: 518 /// Whether the manifest contains at least one partition with a NaN /// value for the field - contains_nan: Option, + pub contains_nan: Option, /// field: 510 /// The minimum value for the field in the manifests /// partitions. - lower_bound: Option, + pub lower_bound: Option, /// field: 511 /// The maximum value for the field in the manifests /// partitions. - upper_bound: Option, + pub upper_bound: Option, } /// This is a helper module that defines types to help with serialization/deserialization. @@ -658,11 +721,17 @@ pub(super) mod _serde { } } - impl From for ManifestListV1 { - fn from(value: super::ManifestList) -> Self { - Self { - entries: value.entries.into_iter().map(Into::into).collect(), - } + impl TryFrom for ManifestListV1 { + type Error = Error; + + fn try_from(value: super::ManifestList) -> Result { + Ok(Self { + entries: value + .entries + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?, + }) } } @@ -769,12 +838,12 @@ pub(super) mod _serde { sequence_number: self.sequence_number, min_sequence_number: self.min_sequence_number, added_snapshot_id: self.added_snapshot_id, - added_data_files_count: Some(self.added_data_files_count), - existing_data_files_count: Some(self.existing_data_files_count), - deleted_data_files_count: Some(self.deleted_data_files_count), - added_rows_count: Some(self.added_rows_count), - existing_rows_count: Some(self.existing_rows_count), - deleted_rows_count: Some(self.deleted_rows_count), + added_data_files_count: Some(self.added_data_files_count.try_into()?), + existing_data_files_count: Some(self.existing_data_files_count.try_into()?), + deleted_data_files_count: Some(self.deleted_data_files_count.try_into()?), + added_rows_count: Some(self.added_rows_count.try_into()?), + existing_rows_count: Some(self.existing_rows_count.try_into()?), + deleted_rows_count: Some(self.deleted_rows_count.try_into()?), partitions, key_metadata: self.key_metadata.map(|b| b.into_vec()).unwrap_or_default(), }) @@ -791,12 +860,24 @@ pub(super) mod _serde { manifest_length: self.manifest_length, partition_spec_id: self.partition_spec_id, added_snapshot_id: self.added_snapshot_id, - added_data_files_count: self.added_data_files_count, - existing_data_files_count: self.existing_data_files_count, - deleted_data_files_count: self.deleted_data_files_count, - added_rows_count: self.added_rows_count, - existing_rows_count: self.existing_rows_count, - deleted_rows_count: self.deleted_rows_count, + added_data_files_count: self + .added_data_files_count + .map(TryInto::try_into) + .transpose()?, + existing_data_files_count: self + .existing_data_files_count + .map(TryInto::try_into) + .transpose()?, + deleted_data_files_count: self + .deleted_data_files_count + .map(TryInto::try_into) + .transpose()?, + added_rows_count: self.added_rows_count.map(TryInto::try_into).transpose()?, + existing_rows_count: self + .existing_rows_count + .map(TryInto::try_into) + .transpose()?, + deleted_rows_count: self.deleted_rows_count.map(TryInto::try_into).transpose()?, partitions, key_metadata: self.key_metadata.map(|b| b.into_vec()).unwrap_or_default(), // as ref: https://iceberg.apache.org/spec/#partitioning @@ -850,66 +931,101 @@ pub(super) mod _serde { sequence_number: value.sequence_number, min_sequence_number: value.min_sequence_number, added_snapshot_id: value.added_snapshot_id, - added_data_files_count: value.added_data_files_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "added_data_files_count in ManifestListEntryV2 should be require", - ) - })?, - existing_data_files_count: value.existing_data_files_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "existing_data_files_count in ManifestListEntryV2 should be require", - ) - })?, - deleted_data_files_count: value.deleted_data_files_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "deleted_data_files_count in ManifestListEntryV2 should be require", - ) - })?, - added_rows_count: value.added_rows_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "added_rows_count in ManifestListEntryV2 should be require", - ) - })?, - existing_rows_count: value.existing_rows_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "existing_rows_count in ManifestListEntryV2 should be require", - ) - })?, - deleted_rows_count: value.deleted_rows_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "deleted_rows_count in ManifestListEntryV2 should be require", - ) - })?, + added_data_files_count: value + .added_data_files_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "added_data_files_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + existing_data_files_count: value + .existing_data_files_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "existing_data_files_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + deleted_data_files_count: value + .deleted_data_files_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "deleted_data_files_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + added_rows_count: value + .added_rows_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "added_rows_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + existing_rows_count: value + .existing_rows_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "existing_rows_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + deleted_rows_count: value + .deleted_rows_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "deleted_rows_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, partitions, key_metadata, }) } } - impl From for ManifestListEntryV1 { - fn from(value: ManifestListEntry) -> Self { + impl TryFrom for ManifestListEntryV1 { + type Error = Error; + + fn try_from(value: ManifestListEntry) -> Result { let partitions = convert_to_serde_field_summary(value.partitions); let key_metadata = convert_to_serde_key_metadata(value.key_metadata); - Self { + Ok(Self { manifest_path: value.manifest_path, manifest_length: value.manifest_length, partition_spec_id: value.partition_spec_id, added_snapshot_id: value.added_snapshot_id, - added_data_files_count: value.added_data_files_count, - existing_data_files_count: value.existing_data_files_count, - deleted_data_files_count: value.deleted_data_files_count, - added_rows_count: value.added_rows_count, - existing_rows_count: value.existing_rows_count, - deleted_rows_count: value.deleted_rows_count, + added_data_files_count: value + .added_data_files_count + .map(TryInto::try_into) + .transpose()?, + existing_data_files_count: value + .existing_data_files_count + .map(TryInto::try_into) + .transpose()?, + deleted_data_files_count: value + .deleted_data_files_count + .map(TryInto::try_into) + .transpose()?, + added_rows_count: value.added_rows_count.map(TryInto::try_into).transpose()?, + existing_rows_count: value + .existing_rows_count + .map(TryInto::try_into) + .transpose()?, + deleted_rows_count: value + .deleted_rows_count + .map(TryInto::try_into) + .transpose()?, partitions, key_metadata, - } + }) } } } @@ -923,9 +1039,9 @@ mod test { use crate::{ io::FileIOBuilder, spec::{ - manifest_list::_serde::ManifestListV1, FieldSummary, Literal, ManifestContentType, - ManifestList, ManifestListEntry, ManifestListWriter, NestedField, PrimitiveType, - StructType, Type, + manifest_list::{_serde::ManifestListV1, UNASSIGNED_SEQUENCE_NUMBER}, + FieldSummary, Literal, ManifestContentType, ManifestList, ManifestListEntry, + ManifestListWriter, NestedField, PrimitiveType, StructType, Type, }, }; @@ -1033,7 +1149,7 @@ mod test { partitions: vec![], key_metadata: vec![], }] - }.into(); + }.try_into().unwrap(); let result = serde_json::to_string(&manifest_list).unwrap(); assert_eq!( result, @@ -1120,15 +1236,17 @@ mod test { #[tokio::test] async fn test_manifest_list_writer_v2() { - let expected_manifest_list = ManifestList { + let snapshot_id = 377075049360453639; + let seq_num = 1; + let mut expected_manifest_list = ManifestList { entries: vec![ManifestListEntry { manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(), manifest_length: 6926, partition_spec_id: 1, content: ManifestContentType::Data, - sequence_number: 1, - min_sequence_number: 1, - added_snapshot_id: 377075049360453639, + sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + added_snapshot_id: snapshot_id, added_data_files_count: Some(1), existing_data_files_count: Some(0), deleted_data_files_count: Some(0), @@ -1145,7 +1263,7 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v2(output_file, 377075049360453639, 0, 1); + let mut writer = ManifestListWriter::v2(output_file, snapshot_id, 0, seq_num); writer .add_manifest_entries(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1162,6 +1280,8 @@ mod test { ))]), ) .unwrap(); + expected_manifest_list.entries[0].sequence_number = seq_num; + expected_manifest_list.entries[0].min_sequence_number = seq_num; assert_eq!(manifest_list, expected_manifest_list); temp_dir.close().unwrap(); diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index 33d8bf958..199fc4a16 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -18,6 +18,7 @@ //! Spec for Iceberg. mod datatypes; +mod manifest; mod manifest_list; mod partition; mod schema; @@ -28,6 +29,7 @@ mod transform; mod values; pub use datatypes::*; +pub use manifest::*; pub use manifest_list::*; pub use partition::*; pub use schema::*; diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 484ec7e56..9388820a2 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -22,7 +22,9 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use typed_builder::TypedBuilder; -use super::transform::Transform; +use crate::{Error, ErrorKind}; + +use super::{transform::Transform, NestedField, Schema, StructType}; /// Reference to [`PartitionSpec`]. pub type PartitionSpecRef = Arc; @@ -69,6 +71,30 @@ impl PartitionSpec { .iter() .all(|f| matches!(f.transform, Transform::Void)) } + + /// Returns the partition type of this partition spec. + pub fn partition_type(&self, schema: &Schema) -> Result { + let mut fields = Vec::with_capacity(self.fields.len()); + for partition_field in &self.fields { + let field = schema + .field_by_id(partition_field.source_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "No column with source column id {} in schema {:?}", + partition_field.source_id, schema + ), + ) + })?; + let res_type = partition_field.transform.result_type(&field.field_type)?; + let field = + NestedField::optional(partition_field.field_id, &partition_field.name, res_type) + .into(); + fields.push(field); + } + Ok(StructType::new(fields)) + } } /// Reference to [`UnboundPartitionSpec`]. @@ -111,6 +137,8 @@ impl UnboundPartitionSpec { #[cfg(test)] mod tests { + use crate::spec::Type; + use super::*; #[test] @@ -276,4 +304,189 @@ mod tests { assert_eq!("ts_day", partition_spec.fields[0].name); assert_eq!(Transform::Day, partition_spec.fields[0].transform); } + + #[test] + fn test_partition_type() { + let spec = r#" + { + "spec-id": 1, + "fields": [ { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + }, { + "source-id": 1, + "field-id": 1001, + "name": "id_bucket", + "transform": "bucket[16]" + }, { + "source-id": 2, + "field-id": 1002, + "name": "id_truncate", + "transform": "truncate[4]" + } ] + } + "#; + + let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap(); + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + NestedField::required( + 3, + "ts", + Type::Primitive(crate::spec::PrimitiveType::Timestamp), + ) + .into(), + NestedField::required( + 4, + "ts_day", + Type::Primitive(crate::spec::PrimitiveType::Timestamp), + ) + .into(), + NestedField::required( + 5, + "id_bucket", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into(), + NestedField::required( + 6, + "id_truncate", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into(), + ]) + .build() + .unwrap(); + + let partition_type = partition_spec.partition_type(&schema).unwrap(); + assert_eq!(3, partition_type.fields().len()); + assert_eq!( + *partition_type.fields()[0], + NestedField::optional( + partition_spec.fields[0].field_id, + &partition_spec.fields[0].name, + Type::Primitive(crate::spec::PrimitiveType::Int) + ) + ); + assert_eq!( + *partition_type.fields()[1], + NestedField::optional( + partition_spec.fields[1].field_id, + &partition_spec.fields[1].name, + Type::Primitive(crate::spec::PrimitiveType::Int) + ) + ); + assert_eq!( + *partition_type.fields()[2], + NestedField::optional( + partition_spec.fields[2].field_id, + &partition_spec.fields[2].name, + Type::Primitive(crate::spec::PrimitiveType::String) + ) + ); + } + + #[test] + fn test_partition_empty() { + let spec = r#" + { + "spec-id": 1, + "fields": [] + } + "#; + + let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap(); + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + NestedField::required( + 3, + "ts", + Type::Primitive(crate::spec::PrimitiveType::Timestamp), + ) + .into(), + NestedField::required( + 4, + "ts_day", + Type::Primitive(crate::spec::PrimitiveType::Timestamp), + ) + .into(), + NestedField::required( + 5, + "id_bucket", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into(), + NestedField::required( + 6, + "id_truncate", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into(), + ]) + .build() + .unwrap(); + + let partition_type = partition_spec.partition_type(&schema).unwrap(); + assert_eq!(0, partition_type.fields().len()); + } + + #[test] + fn test_partition_error() { + let spec = r#" + { + "spec-id": 1, + "fields": [ { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + }, { + "source-id": 1, + "field-id": 1001, + "name": "id_bucket", + "transform": "bucket[16]" + }, { + "source-id": 2, + "field-id": 1002, + "name": "id_truncate", + "transform": "truncate[4]" + } ] + } + "#; + + let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap(); + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + assert!(partition_spec.partition_type(&schema).is_err()); + } } diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 3aa1f8b5b..724498b45 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -686,7 +686,6 @@ pub(super) mod _serde { impl TryFrom for Schema { type Error = Error; fn try_from(value: SchemaV2) -> Result { - dbg!(&value); Schema::builder() .with_schema_id(value.schema_id) .with_fields(value.fields.fields().iter().cloned()) diff --git a/crates/iceberg/src/spec/transform.rs b/crates/iceberg/src/spec/transform.rs index b66e059ed..839d582dc 100644 --- a/crates/iceberg/src/spec/transform.rs +++ b/crates/iceberg/src/spec/transform.rs @@ -126,17 +126,20 @@ pub enum Transform { impl Transform { /// Get the return type of transform given the input type. /// Returns `None` if it can't be transformed. - pub fn result_type(&self, input_type: &Type) -> Option { + pub fn result_type(&self, input_type: &Type) -> Result { match self { Transform::Identity => { if matches!(input_type, Type::Primitive(_)) { - Some(input_type.clone()) + Ok(input_type.clone()) } else { - None + Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of identity transform",), + )) } } - Transform::Void => Some(input_type.clone()), - Transform::Unknown => Some(Type::Primitive(PrimitiveType::String)), + Transform::Void => Ok(input_type.clone()), + Transform::Unknown => Ok(Type::Primitive(PrimitiveType::String)), Transform::Bucket(_) => { if let Type::Primitive(p) = input_type { match p { @@ -150,11 +153,17 @@ impl Transform { | PrimitiveType::String | PrimitiveType::Uuid | PrimitiveType::Fixed(_) - | PrimitiveType::Binary => Some(Type::Primitive(PrimitiveType::Int)), - _ => None, + | PrimitiveType::Binary => Ok(Type::Primitive(PrimitiveType::Int)), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of bucket transform",), + )), } } else { - None + Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of bucket transform",), + )) } } Transform::Truncate(_) => { @@ -164,11 +173,17 @@ impl Transform { | PrimitiveType::Long | PrimitiveType::String | PrimitiveType::Binary - | PrimitiveType::Decimal { .. } => Some(input_type.clone()), - _ => None, + | PrimitiveType::Decimal { .. } => Ok(input_type.clone()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of truncate transform",), + )), } } else { - None + Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of truncate transform",), + )) } } Transform::Year | Transform::Month | Transform::Day => { @@ -176,23 +191,35 @@ impl Transform { match p { PrimitiveType::Timestamp | PrimitiveType::Timestamptz - | PrimitiveType::Date => Some(Type::Primitive(PrimitiveType::Int)), - _ => None, + | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Int)), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of {self} transform",), + )), } } else { - None + Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of {self} transform",), + )) } } Transform::Hour => { if let Type::Primitive(p) = input_type { match p { PrimitiveType::Timestamp | PrimitiveType::Timestamptz => { - Some(Type::Primitive(PrimitiveType::Int)) + Ok(Type::Primitive(PrimitiveType::Int)) } - _ => None, + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of {self} transform",), + )), } } else { - None + Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of {self} transform",), + )) } } } @@ -367,7 +394,7 @@ mod tests { } for (input_type, result_type) in param.trans_types { - assert_eq!(result_type, trans.result_type(&input_type)); + assert_eq!(result_type, trans.result_type(&input_type).ok()); } } diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index bd9f2e848..de3da0497 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -34,7 +34,6 @@ use uuid::Uuid; use crate::{Error, ErrorKind}; use super::datatypes::{PrimitiveType, Type}; -use super::MAX_DECIMAL_PRECISION; pub use _serde::RawLiteral; @@ -609,6 +608,16 @@ pub struct Struct { } impl Struct { + /// Create a empty struct. + pub fn empty() -> Self { + Self { + fields: Vec::new(), + field_ids: Vec::new(), + field_names: Vec::new(), + null_bitmap: BitVec::new(), + } + } + /// Create a iterator to read the field in order of (field_id, field_value, field_name). pub fn iter(&self) -> impl Iterator, &str)> { self.null_bitmap @@ -906,34 +915,6 @@ impl Literal { } } - /// Get datatype of value - pub fn datatype(&self) -> Type { - match self { - Literal::Primitive(prim) => match prim { - PrimitiveLiteral::Boolean(_) => Type::Primitive(PrimitiveType::Boolean), - PrimitiveLiteral::Int(_) => Type::Primitive(PrimitiveType::Int), - PrimitiveLiteral::Long(_) => Type::Primitive(PrimitiveType::Long), - PrimitiveLiteral::Float(_) => Type::Primitive(PrimitiveType::Float), - PrimitiveLiteral::Double(_) => Type::Primitive(PrimitiveType::Double), - PrimitiveLiteral::Date(_) => Type::Primitive(PrimitiveType::Date), - PrimitiveLiteral::Time(_) => Type::Primitive(PrimitiveType::Time), - PrimitiveLiteral::Timestamp(_) => Type::Primitive(PrimitiveType::Timestamp), - PrimitiveLiteral::TimestampTZ(_) => Type::Primitive(PrimitiveType::Timestamptz), - PrimitiveLiteral::Fixed(vec) => { - Type::Primitive(PrimitiveType::Fixed(vec.len() as u64)) - } - PrimitiveLiteral::Binary(_) => Type::Primitive(PrimitiveType::Binary), - PrimitiveLiteral::String(_) => Type::Primitive(PrimitiveType::String), - PrimitiveLiteral::UUID(_) => Type::Primitive(PrimitiveType::Uuid), - PrimitiveLiteral::Decimal(_) => Type::Primitive(PrimitiveType::Decimal { - precision: MAX_DECIMAL_PRECISION, - scale: 0, - }), - }, - _ => unimplemented!(), - } - } - /// Convert Value to the any type pub fn into_any(self) -> Box { match self { diff --git a/crates/iceberg/testdata/partition_manifest_v1.avro b/crates/iceberg/testdata/partition_manifest_v1.avro new file mode 100644 index 000000000..d1ada6188 Binary files /dev/null and b/crates/iceberg/testdata/partition_manifest_v1.avro differ diff --git a/crates/iceberg/testdata/partition_manifest_v2.avro b/crates/iceberg/testdata/partition_manifest_v2.avro new file mode 100644 index 000000000..9d10a8711 Binary files /dev/null and b/crates/iceberg/testdata/partition_manifest_v2.avro differ diff --git a/crates/iceberg/testdata/unpartition_manifest_v1.avro b/crates/iceberg/testdata/unpartition_manifest_v1.avro new file mode 100644 index 000000000..a4ef37646 Binary files /dev/null and b/crates/iceberg/testdata/unpartition_manifest_v1.avro differ diff --git a/crates/iceberg/testdata/unpartition_manifest_v2.avro b/crates/iceberg/testdata/unpartition_manifest_v2.avro new file mode 100644 index 000000000..7652159f3 Binary files /dev/null and b/crates/iceberg/testdata/unpartition_manifest_v2.avro differ