From 22e07d861bdaf4b39c83535b9c91b58775db34ee Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 30 Nov 2023 23:24:39 +0800 Subject: [PATCH 1/7] support read/write Manifest --- Cargo.toml | 1 + crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/spec/manifest.rs | 1847 +++++++++++++++++ crates/iceberg/src/spec/manifest_list.rs | 73 +- crates/iceberg/src/spec/mod.rs | 2 + crates/iceberg/src/spec/partition.rs | 28 +- crates/iceberg/src/spec/schema.rs | 1 - crates/iceberg/src/spec/transform.rs | 63 +- crates/iceberg/src/spec/values.rs | 39 +- .../testdata/partition_manifest_v1.avro | Bin 0 -> 6067 bytes .../testdata/partition_manifest_v2.avro | Bin 0 -> 3350 bytes .../testdata/unpartition_manifest_v1.avro | Bin 0 -> 6032 bytes .../testdata/unpartition_manifest_v2.avro | Bin 0 -> 3095 bytes 13 files changed, 1983 insertions(+), 72 deletions(-) create mode 100644 crates/iceberg/src/spec/manifest.rs create mode 100644 crates/iceberg/testdata/partition_manifest_v1.avro create mode 100644 crates/iceberg/testdata/partition_manifest_v2.avro create mode 100644 crates/iceberg/testdata/unpartition_manifest_v1.avro create mode 100644 crates/iceberg/testdata/unpartition_manifest_v2.avro diff --git a/Cargo.toml b/Cargo.toml index 61b82c224..a59a4bb4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ serde_bytes = "0.11.8" serde_derive = "^1.0" serde_json = "^1.0" serde_repr = "0.1.16" +serde_with = "3.4.0" tempfile = "3.8" tokio = { version = "1", features = ["macros"] } typed-builder = "^0.18" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 6d2443ccd..b4867bbe4 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -53,6 +53,7 @@ serde_bytes = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } serde_repr = { workspace = true } +serde_with = { workspace = true } typed-builder = { workspace = true } url = { workspace = true } urlencoding = { workspace = true } diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs new file mode 100644 index 000000000..2a5d58728 --- /dev/null +++ b/crates/iceberg/src/spec/manifest.rs @@ -0,0 +1,1847 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Manifest for Iceberg. +use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; + +use super::Literal; +use super::{ + FieldSummary, FormatVersion, ManifestContentType, ManifestListEntry, PartitionSpec, Schema, + Struct, UNASSIGNED_SEQ_NUMBER, +}; +use crate::io::OutputFile; +use crate::spec::PartitionField; +use crate::{Error, ErrorKind}; +use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter}; +use futures::AsyncWriteExt; +use serde_json::to_vec; +use std::cmp::min; +use std::collections::HashMap; +use std::str::FromStr; + +/// A manifest contains metadata and a list of entries. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct Manifest { + metadata: ManifestMetadata, + entries: Vec, +} + +impl Manifest { + /// Parse manifest from bytes of avro file. + pub fn parse_avro(bs: &[u8]) -> Result { + let reader = AvroReader::new(bs)?; + + // Parse manifest metadata + let meta = reader.user_metadata(); + let metadata = ManifestMetadata::parse(meta)?; + + // Parse manifest entries + let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?; + + let entries = match metadata.format_version { + FormatVersion::V1 => { + let schema = manifest_schema_v1(partition_type.clone())?; + let reader = AvroReader::with_schema(&schema, bs)?; + reader + .into_iter() + .map(|value| { + from_value::<_serde::ManifestEntryV1>(&value?)? + .try_into(&partition_type, &metadata.schema) + }) + .collect::, Error>>()? + } + FormatVersion::V2 => { + let schema = manifest_schema_v2(partition_type.clone())?; + let reader = AvroReader::with_schema(&schema, bs)?; + reader + .into_iter() + .map(|value| { + from_value::<_serde::ManifestEntryV2>(&value?)? + .try_into(&partition_type, &metadata.schema) + }) + .collect::, Error>>()? + } + }; + + Ok(Manifest { metadata, entries }) + } +} + +/// A manifest writer. +pub struct ManifestWriter { + output: OutputFile, + + snapshot_id: i64, + + added_files: i32, + added_rows: i64, + existing_files: i32, + existing_rows: i64, + deleted_files: i32, + deleted_rows: i64, + + seq_num: i64, + min_seq_num: Option, + + key_metadata: Option>, + + field_summary: HashMap, +} + +impl ManifestWriter { + /// Create a new manifest writer. + pub fn new( + output: OutputFile, + snapshot_id: i64, + seq_num: i64, + key_metadata: Option>, + ) -> Self { + Self { + output, + snapshot_id, + added_files: 0, + added_rows: 0, + existing_files: 0, + existing_rows: 0, + deleted_files: 0, + deleted_rows: 0, + seq_num, + min_seq_num: None, + key_metadata, + field_summary: HashMap::new(), + } + } + + fn update_field_summary(&mut self, entry: &ManifestEntry) { + // Update field summary + if let Some(null) = &entry.data_file.null_value_counts { + for (&k, &v) in null { + let field_summary = self.field_summary.entry(k).or_default(); + if v > 0 { + field_summary.contains_null = true; + } + } + } + if let Some(nan) = &entry.data_file.nan_value_counts { + for (&k, &v) in nan { + let field_summary = self.field_summary.entry(k).or_default(); + assert!(v >= 0); + if v > 0 { + field_summary.contains_nan = Some(true); + } + if v == 0 { + field_summary.contains_nan = Some(false); + } + } + } + if let Some(lower_bound) = &entry.data_file.lower_bounds { + for (&k, v) in lower_bound { + let field_summary = self.field_summary.entry(k).or_default(); + if let Some(cur) = &field_summary.lower_bound { + if v < cur { + field_summary.lower_bound = Some(v.clone()); + } + } else { + field_summary.lower_bound = Some(v.clone()); + } + } + } + if let Some(upper_bound) = &entry.data_file.upper_bounds { + for (&k, v) in upper_bound { + let field_summary = self.field_summary.entry(k).or_default(); + if let Some(cur) = &field_summary.upper_bound { + if v > cur { + field_summary.upper_bound = Some(v.clone()); + } + } else { + field_summary.upper_bound = Some(v.clone()); + } + } + } + } + + fn get_field_summary_vec(&mut self, spec_fields: &[PartitionField]) -> Vec { + let mut partition_summary = Vec::with_capacity(self.field_summary.len()); + for field in spec_fields { + let entry = self + .field_summary + .remove(&field.source_id) + .unwrap_or(FieldSummary::default()); + partition_summary.push(entry); + } + partition_summary + } + + /// Write a manifest entry. + pub async fn write(mut self, manifest: Manifest) -> Result { + // Create the avro writer + let partition_type = manifest + .metadata + .partition_spec + .partition_type(&manifest.metadata.schema)?; + let table_schema = &manifest.metadata.schema; + let avro_schema = match manifest.metadata.format_version { + FormatVersion::V1 => manifest_schema_v1(partition_type.clone())?, + FormatVersion::V2 => manifest_schema_v2(partition_type.clone())?, + }; + let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new()); + avro_writer.add_user_metadata( + "schema".to_string(), + to_vec(table_schema).map_err(|err| { + Error::new(ErrorKind::DataInvalid, "Fail to serialize table schema") + .with_source(err) + })?, + )?; + avro_writer.add_user_metadata( + "schema-id".to_string(), + table_schema.schema_id().to_string(), + )?; + avro_writer.add_user_metadata( + "partition-spec".to_string(), + to_vec(&manifest.metadata.partition_spec.fields).map_err(|err| { + Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec") + .with_source(err) + })?, + )?; + avro_writer.add_user_metadata( + "partition-spec-id".to_string(), + manifest.metadata.partition_spec.spec_id.to_string(), + )?; + avro_writer.add_user_metadata( + "format-version".to_string(), + manifest.metadata.format_version.to_string(), + )?; + if manifest.metadata.format_version == FormatVersion::V2 { + avro_writer + .add_user_metadata("content".to_string(), manifest.metadata.content.to_string())?; + } + + // Write manifest entries + for entry in manifest.entries { + match entry.status { + ManifestStatus::Added => { + self.added_files += 1; + self.added_rows += entry.data_file.record_count; + } + ManifestStatus::Deleted => { + self.deleted_files += 1; + self.deleted_rows += entry.data_file.record_count; + } + ManifestStatus::Existing => { + self.existing_files += 1; + self.existing_rows += entry.data_file.record_count; + } + } + + if entry.is_alive() { + if let Some(cur_min_seq_num) = self.min_seq_num { + self.min_seq_num = Some( + entry + .sequence_number + .map(|v| min(v, cur_min_seq_num)) + .unwrap_or(cur_min_seq_num), + ); + } else { + self.min_seq_num = entry.sequence_number; + } + } + + self.update_field_summary(&entry); + + let value = match manifest.metadata.format_version { + FormatVersion::V1 => { + to_value(_serde::ManifestEntryV1::try_from(entry, &partition_type)?)? + .resolve(&avro_schema)? + } + FormatVersion::V2 => { + to_value(_serde::ManifestEntryV2::try_from(entry, &partition_type)?)? + .resolve(&avro_schema)? + } + }; + + avro_writer.append(value)?; + } + + let length = avro_writer.flush()?; + let content = avro_writer.into_inner()?; + let mut writer = self.output.writer().await?; + writer.write_all(&content).await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "Fail to write Manifest Entry").with_source(err) + })?; + writer.close().await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "Fail to write Manifest Entry").with_source(err) + })?; + + let partition_summary = + self.get_field_summary_vec(&manifest.metadata.partition_spec.fields); + + Ok(ManifestListEntry { + manifest_path: self.output.location().to_string(), + manifest_length: length as i64, + partition_spec_id: manifest.metadata.partition_spec.spec_id, + content: manifest.metadata.content, + sequence_number: self.seq_num, + min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQ_NUMBER), + added_snapshot_id: self.snapshot_id, + added_data_files_count: Some(self.added_files), + existing_data_files_count: Some(self.existing_files), + deleted_data_files_count: Some(self.deleted_files), + added_rows_count: Some(self.added_rows), + existing_rows_count: Some(self.existing_rows), + deleted_rows_count: Some(self.deleted_rows), + partitions: partition_summary, + key_metadata: self.key_metadata.unwrap_or_default(), + }) + } +} + +/// This is a helper module that defines the schema field of the manifest list entry. +mod _const_schema { + use std::sync::Arc; + + use apache_avro::Schema as AvroSchema; + use once_cell::sync::Lazy; + + use crate::{ + avro::schema_to_avro_schema, + spec::{ + ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, + }, + Error, + }; + + pub static STATUS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 0, + "status", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + + pub static SNAPSHOT_ID_V1: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 1, + "snapshot_id", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + pub static SNAPSHOT_ID_V2: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 1, + "snapshot_id", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + pub static SEQUENCE_NUMBER: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 3, + "sequence_number", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + pub static FILE_SEQUENCE_NUMBER: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 4, + "file_sequence_number", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + pub static CONTENT: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 134, + "content", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + + pub static FILE_PATH: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 100, + "file_path", + Type::Primitive(PrimitiveType::String), + )) + }) + }; + + pub static FILE_FORMAT: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 101, + "file_format", + Type::Primitive(PrimitiveType::String), + )) + }) + }; + + pub static RECORD_COUNT: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 103, + "record_count", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + pub static FILE_SIZE_IN_BYTES: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 104, + "file_size_in_bytes", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + // Deprecated. Always write a default in v1. Do not write in v2. + pub static BLOCK_SIZE_IN_BYTES: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::required( + 105, + "block_size_in_bytes", + Type::Primitive(PrimitiveType::Long), + )) + }) + }; + + pub static COLUMN_SIZES: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 108, + "column_sizes", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 117, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 118, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + pub static VALUE_COUNTS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 109, + "value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 119, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 120, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + pub static NULL_VALUE_COUNTS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 110, + "null_value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 121, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 122, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + pub static NAN_VALUE_COUNTS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 137, + "nan_value_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 138, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 139, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + pub static DISTINCT_COUNTS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 111, + "distinct_counts", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 123, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 124, + "value", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + pub static LOWER_BOUNDS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 125, + "lower_bounds", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 126, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 127, + "value", + Type::Primitive(PrimitiveType::Binary), + )), + }), + )) + }) + }; + + pub static UPPER_BOUNDS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 128, + "upper_bounds", + Type::Map(MapType { + key_field: Arc::new(NestedField::required( + 129, + "key", + Type::Primitive(PrimitiveType::Int), + )), + value_field: Arc::new(NestedField::required( + 130, + "value", + Type::Primitive(PrimitiveType::Binary), + )), + }), + )) + }) + }; + + pub static KEY_METADATA: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 131, + "key_metadata", + Type::Primitive(PrimitiveType::Binary), + )) + }) + }; + + pub static SPLIT_OFFSETS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 132, + "split_offsets", + Type::List(ListType { + element_field: Arc::new(NestedField::required( + 133, + "element", + Type::Primitive(PrimitiveType::Long), + )), + }), + )) + }) + }; + + pub static EQUALITY_IDS: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 135, + "equality_ids", + Type::List(ListType { + element_field: Arc::new(NestedField::required( + 136, + "element", + Type::Primitive(PrimitiveType::Int), + )), + }), + )) + }) + }; + + pub static SORT_ORDER_ID: Lazy = { + Lazy::new(|| { + Arc::new(NestedField::optional( + 140, + "sort_order_id", + Type::Primitive(PrimitiveType::Int), + )) + }) + }; + + pub fn manifest_schema_v2(partition_type: StructType) -> Result { + let fields = vec![ + STATUS.clone(), + SNAPSHOT_ID_V2.clone(), + SEQUENCE_NUMBER.clone(), + FILE_SEQUENCE_NUMBER.clone(), + Arc::new(NestedField::required( + 2, + "data_file", + Type::Struct(StructType::new(vec![ + CONTENT.clone(), + FILE_PATH.clone(), + FILE_FORMAT.clone(), + Arc::new(NestedField::required( + 102, + "partition", + Type::Struct(partition_type), + )), + RECORD_COUNT.clone(), + FILE_SIZE_IN_BYTES.clone(), + COLUMN_SIZES.clone(), + VALUE_COUNTS.clone(), + NULL_VALUE_COUNTS.clone(), + NAN_VALUE_COUNTS.clone(), + DISTINCT_COUNTS.clone(), + LOWER_BOUNDS.clone(), + UPPER_BOUNDS.clone(), + KEY_METADATA.clone(), + SPLIT_OFFSETS.clone(), + EQUALITY_IDS.clone(), + SORT_ORDER_ID.clone(), + ])), + )), + ]; + let schema = Schema::builder().with_fields(fields).build().unwrap(); + schema_to_avro_schema("manifest", &schema) + } + + pub fn manifest_schema_v1(partition_type: StructType) -> Result { + let fields = vec![ + STATUS.clone(), + SNAPSHOT_ID_V1.clone(), + Arc::new(NestedField::required( + 2, + "data_file", + Type::Struct(StructType::new(vec![ + FILE_PATH.clone(), + FILE_FORMAT.clone(), + Arc::new(NestedField::required( + 102, + "partition", + Type::Struct(partition_type), + )), + RECORD_COUNT.clone(), + FILE_SIZE_IN_BYTES.clone(), + BLOCK_SIZE_IN_BYTES.clone(), + COLUMN_SIZES.clone(), + VALUE_COUNTS.clone(), + NULL_VALUE_COUNTS.clone(), + NAN_VALUE_COUNTS.clone(), + DISTINCT_COUNTS.clone(), + LOWER_BOUNDS.clone(), + UPPER_BOUNDS.clone(), + KEY_METADATA.clone(), + SPLIT_OFFSETS.clone(), + SORT_ORDER_ID.clone(), + ])), + )), + ]; + let schema = Schema::builder().with_fields(fields).build().unwrap(); + schema_to_avro_schema("manifest", &schema) + } +} + +/// Meta data of a manifest that is stored in the key-value metadata of the Avro file +#[derive(Debug, PartialEq, Clone, Eq)] +pub struct ManifestMetadata { + /// The table schema at the time the manifest + /// was written + schema: Schema, + /// ID of the schema used to write the manifest as a string + schema_id: i32, + /// The partition spec used to write the manifest + partition_spec: PartitionSpec, + /// Table format version number of the manifest as a string + format_version: FormatVersion, + /// Type of content files tracked by the manifest: “data” or “deletes” + content: ManifestContentType, +} + +impl ManifestMetadata { + /// Parse from metadata in avro file. + pub fn parse(meta: &HashMap>) -> Result { + let schema = { + let bs = meta.get("schema").ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "schema is required in manifest metadata but not found", + ) + })?; + serde_json::from_slice::(bs).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse schema in manifest metadata", + ) + .with_source(err) + })? + }; + let schema_id: i32 = meta + .get("schema-id") + .map(|bs| { + String::from_utf8_lossy(bs).parse().map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse schema id in manifest metadata", + ) + .with_source(err) + }) + }) + .transpose()? + .unwrap_or(0); + let partition_spec = { + let fields = { + let bs = meta.get("partition-spec").ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "partition-spec is required in manifest metadata but not found", + ) + })?; + serde_json::from_slice::>(bs).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse partition spec in manifest metadata", + ) + .with_source(err) + })? + }; + let spec_id = meta + .get("partition-spec-id") + .map(|bs| { + String::from_utf8_lossy(bs).parse().map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse partition spec id in manifest metadata", + ) + .with_source(err) + }) + }) + .transpose()? + .unwrap_or(0); + PartitionSpec { spec_id, fields } + }; + let format_version = if let Some(bs) = meta.get("format-version") { + serde_json::from_slice::(bs).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Fail to parse format version in manifest metadata", + ) + .with_source(err) + })? + } else { + FormatVersion::V1 + }; + let content = if let Some(v) = meta.get("content") { + let v = String::from_utf8_lossy(v); + v.parse()? + } else { + ManifestContentType::Data + }; + Ok(ManifestMetadata { + schema, + schema_id, + partition_spec, + format_version, + content, + }) + } +} + +/// A manifest is an immutable Avro file that lists data files or delete +/// files, along with each file’s partition data tuple, metrics, and tracking +/// information. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct ManifestEntry { + /// field: 0 + /// + /// Used to track additions and deletions. + status: ManifestStatus, + /// field id: 1 + /// + /// Snapshot id where the file was added, or deleted if status is 2. + /// Inherited when null. + snapshot_id: Option, + /// field id: 3 + /// + /// Data sequence number of the file. + /// Inherited when null and status is 1 (added). + sequence_number: Option, + /// field id: 4 + /// + /// File sequence number indicating when the file was added. + /// Inherited when null and status is 1 (added). + file_sequence_number: Option, + /// field id: 2 + /// + /// File path, partition tuple, metrics, … + data_file: DataFile, +} + +impl ManifestEntry { + /// Check if this manifest entry is deleted. + pub fn is_alive(&self) -> bool { + matches!( + self.status, + ManifestStatus::Added | ManifestStatus::Existing + ) + } +} + +/// Used to track additions and deletions in ManifestEntry. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum ManifestStatus { + /// Value: 0 + Existing = 0, + /// Value: 1 + Added = 1, + /// Value: 2 + /// + /// Deletes are informational only and not used in scans. + Deleted = 2, +} + +impl TryFrom for ManifestStatus { + type Error = Error; + + fn try_from(v: i32) -> Result { + match v { + 0 => Ok(ManifestStatus::Existing), + 1 => Ok(ManifestStatus::Added), + 2 => Ok(ManifestStatus::Deleted), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("manifest status {} is invalid", v), + )), + } + } +} + +/// Data file carries data file path, partition tuple, metrics, … +#[derive(Debug, PartialEq, Clone, Eq)] +pub struct DataFile { + /// field id: 134 + /// + /// Type of content stored by the data file: data, equality deletes, + /// or position deletes (all v1 files are data files) + content: DataContentType, + /// field id: 100 + /// + /// Full URI for the file with FS scheme + file_path: String, + /// field id: 101 + /// + /// String file format name, avro, orc or parquet + file_format: DataFileFormat, + /// field id: 102 + /// + /// Partition data tuple, schema based on the partition spec output using + /// partition field ids for the struct field ids + partition: Struct, + /// field id: 103 + /// + /// Number of records in this file + record_count: i64, + /// field id: 104 + /// + /// Total file size in bytes + file_size_in_bytes: i64, + /// field id: 108 + /// key field id: 117 + /// value field id: 118 + /// + /// Map from column id to the total size on disk of all regions that + /// store the column. Does not include bytes necessary to read other + /// columns, like footers. Leave null for row-oriented formats (Avro) + column_sizes: Option>, + /// field id: 109 + /// key field id: 119 + /// value field id: 120 + /// + /// Map from column id to number of values in the column (including null + /// and NaN values) + value_counts: Option>, + /// field id: 110 + /// key field id: 121 + /// value field id: 122 + /// + /// Map from column id to number of null values in the column + null_value_counts: Option>, + /// field id: 137 + /// key field id: 138 + /// value field id: 139 + /// + /// Map from column id to number of NaN values in the column + nan_value_counts: Option>, + /// field id: 111 + /// key field id: 123 + /// value field id: 124 + /// + /// Map from column id to number of distinct values in the column; + /// distinct counts must be derived using values in the file by counting + /// or using sketches, but not using methods like merging existing + /// distinct counts + distinct_counts: Option>, + /// field id: 125 + /// key field id: 126 + /// value field id: 127 + /// + /// Map from column id to lower bound in the column serialized as binary. + /// Each value must be less than or equal to all non-null, non-NaN values + /// in the column for the file. + /// + /// Reference: + /// + /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) + lower_bounds: Option>, + /// field id: 128 + /// key field id: 129 + /// value field id: 130 + /// + /// Map from column id to upper bound in the column serialized as binary. + /// Each value must be greater than or equal to all non-null, non-Nan + /// values in the column for the file. + /// + /// Reference: + /// + /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) + upper_bounds: Option>, + /// field id: 131 + /// + /// Implementation-specific key metadata for encryption + key_metadata: Option>, + /// field id: 132 + /// element field id: 133 + /// + /// Split offsets for the data file. For example, all row group offsets + /// in a Parquet file. Must be sorted ascending + split_offsets: Option>, + /// field id: 135 + /// element field id: 136 + /// + /// Field ids used to determine row equality in equality delete files. + /// Required when content is EqualityDeletes and should be null + /// otherwise. Fields with ids listed in this column must be present + /// in the delete file + equality_ids: Option>, + /// field id: 140 + /// + /// ID representing sort order for this file. + /// + /// If sort order ID is missing or unknown, then the order is assumed to + /// be unsorted. Only data files and equality delete files should be + /// written with a non-null order id. Position deletes are required to be + /// sorted by file and position, not a table order, and should set sort + /// order id to null. Readers must ignore sort order id for position + /// delete files. + sort_order_id: Option, +} + +/// Type of content stored by the data file: data, equality deletes, or +/// position deletes (all v1 files are data files) +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum DataContentType { + /// value: 0 + Data = 0, + /// value: 1 + PositionDeletes = 1, + /// value: 2 + EqualityDeletes = 2, +} + +impl TryFrom for DataContentType { + type Error = Error; + + fn try_from(v: i32) -> Result { + match v { + 0 => Ok(DataContentType::Data), + 1 => Ok(DataContentType::PositionDeletes), + 2 => Ok(DataContentType::EqualityDeletes), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("data content type {} is invalid", v), + )), + } + } +} + +/// Format of this data. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum DataFileFormat { + /// Avro file format: + Avro, + /// Orc file format: + Orc, + /// Parquet file format: + Parquet, +} + +impl FromStr for DataFileFormat { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "avro" => Ok(Self::Avro), + "orc" => Ok(Self::Orc), + "parquet" => Ok(Self::Parquet), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported data file format: {}", s), + )), + } + } +} + +impl ToString for DataFileFormat { + fn to_string(&self) -> String { + match self { + DataFileFormat::Avro => "avro", + DataFileFormat::Orc => "orc", + DataFileFormat::Parquet => "parquet", + } + .to_string() + } +} + +mod _serde { + use std::collections::HashMap; + + use serde_bytes::ByteBuf; + use serde_derive::{Deserialize, Serialize}; + use serde_with::serde_as; + + use crate::spec::Literal; + use crate::spec::RawLiteral; + use crate::spec::Schema; + use crate::spec::Struct; + use crate::spec::StructType; + use crate::spec::Type; + use crate::Error; + use crate::ErrorKind; + + use super::ManifestEntry; + + #[derive(Serialize, Deserialize)] + pub(super) struct ManifestEntryV2 { + status: i32, + snapshot_id: Option, + sequence_number: Option, + file_sequence_number: Option, + data_file: DataFile, + } + + impl ManifestEntryV2 { + pub fn try_from(value: ManifestEntry, partition_type: &StructType) -> Result { + Ok(Self { + status: value.status as i32, + snapshot_id: value.snapshot_id, + sequence_number: value.sequence_number, + file_sequence_number: value.file_sequence_number, + data_file: DataFile::try_from(value.data_file, partition_type, false)?, + }) + } + + pub fn try_into( + self, + partition_type: &StructType, + schema: &Schema, + ) -> Result { + Ok(ManifestEntry { + status: self.status.try_into()?, + snapshot_id: self.snapshot_id, + sequence_number: self.sequence_number, + file_sequence_number: self.file_sequence_number, + data_file: self.data_file.try_into(partition_type, schema)?, + }) + } + } + + #[derive(Serialize, Deserialize)] + pub(super) struct ManifestEntryV1 { + status: i32, + pub snapshot_id: i64, + data_file: DataFile, + } + + impl ManifestEntryV1 { + pub fn try_from(value: ManifestEntry, partition_type: &StructType) -> Result { + Ok(Self { + status: value.status as i32, + snapshot_id: value.snapshot_id.unwrap_or_default(), + data_file: DataFile::try_from(value.data_file, partition_type, true)?, + }) + } + + pub fn try_into( + self, + partition_type: &StructType, + schema: &Schema, + ) -> Result { + Ok(ManifestEntry { + status: self.status.try_into()?, + snapshot_id: Some(self.snapshot_id), + sequence_number: None, + file_sequence_number: None, + data_file: self.data_file.try_into(partition_type, schema)?, + }) + } + } + + #[serde_as] + #[derive(Serialize, Deserialize)] + pub(super) struct DataFile { + #[serde(default)] + content: i32, + file_path: String, + file_format: String, + partition: RawLiteral, + record_count: i64, + file_size_in_bytes: i64, + #[serde(skip_deserializing, skip_serializing_if = "Option::is_none")] + block_size_in_bytes: Option, + column_sizes: Option>, + value_counts: Option>, + null_value_counts: Option>, + nan_value_counts: Option>, + distinct_counts: Option>, + lower_bounds: Option>, + upper_bounds: Option>, + key_metadata: Option, + split_offsets: Option>, + #[serde(default)] + equality_ids: Option>, + sort_order_id: Option, + } + + impl DataFile { + pub fn try_from( + value: super::DataFile, + partition_type: &StructType, + is_version_1: bool, + ) -> Result { + let block_size_in_bytes = if is_version_1 { Some(0) } else { None }; + Ok(Self { + content: value.content as i32, + file_path: value.file_path, + file_format: value.file_format.to_string(), + partition: RawLiteral::try_from( + Literal::Struct(value.partition), + &Type::Struct(partition_type.clone()), + )?, + record_count: value.record_count, + file_size_in_bytes: value.file_size_in_bytes, + block_size_in_bytes, + column_sizes: value.column_sizes.map(to_i64_entry), + value_counts: value.value_counts.map(to_i64_entry), + null_value_counts: value.null_value_counts.map(to_i64_entry), + nan_value_counts: value.nan_value_counts.map(to_i64_entry), + distinct_counts: value.distinct_counts.map(to_i64_entry), + lower_bounds: value.lower_bounds.map(to_bytes_entry), + upper_bounds: value.upper_bounds.map(to_bytes_entry), + key_metadata: value.key_metadata.map(serde_bytes::ByteBuf::from), + split_offsets: value.split_offsets, + equality_ids: value.equality_ids, + sort_order_id: value.sort_order_id, + }) + } + pub fn try_into( + self, + partition_type: &StructType, + schema: &Schema, + ) -> Result { + let partition = self + .partition + .try_into(&Type::Struct(partition_type.clone()))? + .map(|v| { + if let Literal::Struct(v) = v { + Ok(v) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + "partition value is not a struct", + )) + } + }) + .transpose()? + .unwrap_or(Struct::empty()); + Ok(super::DataFile { + content: self.content.try_into()?, + file_path: self.file_path, + file_format: self.file_format.parse()?, + partition, + record_count: self.record_count, + file_size_in_bytes: self.file_size_in_bytes, + column_sizes: self.column_sizes.map(parse_i64_entry), + value_counts: self.value_counts.map(parse_i64_entry), + null_value_counts: self.null_value_counts.map(parse_i64_entry), + nan_value_counts: self.nan_value_counts.map(parse_i64_entry), + distinct_counts: self.distinct_counts.map(parse_i64_entry), + lower_bounds: self + .lower_bounds + .map(|v| parse_bytes_entry(v, schema)) + .transpose()?, + upper_bounds: self + .upper_bounds + .map(|v| parse_bytes_entry(v, schema)) + .transpose()?, + key_metadata: self.key_metadata.map(|v| v.to_vec()), + split_offsets: self.split_offsets, + equality_ids: self.equality_ids, + sort_order_id: self.sort_order_id, + }) + } + } + + #[serde_as] + #[derive(Serialize, Deserialize)] + #[cfg_attr(test, derive(Debug, PartialEq, Eq))] + struct BytesEntry { + key: i32, + value: serde_bytes::ByteBuf, + } + + fn parse_bytes_entry( + v: Vec, + schema: &Schema, + ) -> Result, Error> { + let mut m = HashMap::with_capacity(v.len()); + for entry in v { + let data_type = &schema + .field_by_id(entry.key) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Can't find field id {} for upper/lower_bounds", entry.key), + ) + })? + .field_type; + m.insert(entry.key, Literal::try_from_bytes(&entry.value, data_type)?); + } + Ok(m) + } + + fn to_bytes_entry(v: HashMap) -> Vec { + v.into_iter() + .map(|e| BytesEntry { + key: e.0, + value: Into::::into(e.1), + }) + .collect() + } + + #[derive(Serialize, Deserialize)] + #[cfg_attr(test, derive(Debug, PartialEq, Eq))] + struct I64Entry { + key: i32, + value: i64, + } + + fn parse_i64_entry(v: Vec) -> HashMap { + let mut m = HashMap::with_capacity(v.len()); + for entry in v { + m.insert(entry.key, entry.value); + } + m + } + + fn to_i64_entry(entries: HashMap) -> Vec { + entries + .iter() + .map(|e| I64Entry { + key: *e.0, + value: *e.1, + }) + .collect() + } +} + +#[cfg(test)] +mod tests { + use std::fs; + + use tempfile::TempDir; + + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::NestedField; + use crate::spec::PrimitiveType; + use crate::spec::Struct; + use crate::spec::Transform; + use crate::spec::Type; + use std::sync::Arc; + + #[test] + fn test_parse_manifest_v2_unpartition() { + let path = format!( + "{}/testdata/unpartition_manifest_v2.avro", + env!("CARGO_MANIFEST_DIR") + ); + let bs = fs::read(path).expect("read_file must succeed"); + let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + // test metadata + assert!(manifest.metadata.schema_id == 0); + assert_eq!(manifest.metadata.schema, { + let fields = vec![ + // id v_int v_long v_float v_double v_varchar v_bool v_date v_timestamp v_decimal v_ts_ntz + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "v_int", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 3, + "v_long", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 4, + "v_float", + Type::Primitive(PrimitiveType::Float), + )), + Arc::new(NestedField::optional( + 5, + "v_double", + Type::Primitive(PrimitiveType::Double), + )), + Arc::new(NestedField::optional( + 6, + "v_varchar", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 7, + "v_bool", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 8, + "v_date", + Type::Primitive(PrimitiveType::Date), + )), + Arc::new(NestedField::optional( + 9, + "v_timestamp", + Type::Primitive(PrimitiveType::Timestamptz), + )), + Arc::new(NestedField::optional( + 10, + "v_decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 36, + scale: 10, + }), + )), + Arc::new(NestedField::optional( + 11, + "v_ts_ntz", + Type::Primitive(PrimitiveType::Timestamp), + )), + ]; + Schema::builder().with_fields(fields).build().unwrap() + }); + assert!(manifest.metadata.partition_spec.fields.is_empty()); + assert!(manifest.metadata.content == ManifestContentType::Data); + assert!(manifest.metadata.format_version == FormatVersion::V2); + // test entries + assert!(manifest.entries.len() == 1); + let entry = &manifest.entries[0]; + assert!(entry.status == ManifestStatus::Added); + assert!(entry.snapshot_id == Some(0)); + assert!(entry.sequence_number == Some(1)); + assert!(entry.file_sequence_number == Some(1)); + assert_eq!( + entry.data_file, + DataFile { + content: DataContentType::Data, + file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count: 1, + file_size_in_bytes: 5442, + column_sizes: Some(HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)])), + value_counts: Some(HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)])), + null_value_counts: Some(HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)])), + nan_value_counts: None, + distinct_counts: None, + lower_bounds: None, + upper_bounds: None, + key_metadata: None, + split_offsets: Some(vec![4]), + equality_ids: None, + sort_order_id: None, + } + ); + } + + #[test] + fn test_parse_manifest_v2_partition() { + let path = format!( + "{}/testdata/partition_manifest_v2.avro", + env!("CARGO_MANIFEST_DIR") + ); + let bs = fs::read(path).expect("read_file must succeed"); + let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + assert_eq!(manifest.metadata.schema_id, 0); + assert_eq!(manifest.metadata.schema, { + let fields = vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "v_int", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 3, + "v_long", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 4, + "v_float", + Type::Primitive(PrimitiveType::Float), + )), + Arc::new(NestedField::optional( + 5, + "v_double", + Type::Primitive(PrimitiveType::Double), + )), + Arc::new(NestedField::optional( + 6, + "v_varchar", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 7, + "v_bool", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 8, + "v_date", + Type::Primitive(PrimitiveType::Date), + )), + Arc::new(NestedField::optional( + 9, + "v_timestamp", + Type::Primitive(PrimitiveType::Timestamptz), + )), + Arc::new(NestedField::optional( + 10, + "v_decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 36, + scale: 10, + }), + )), + Arc::new(NestedField::optional( + 11, + "v_ts_ntz", + Type::Primitive(PrimitiveType::Timestamp), + )), + ]; + Schema::builder().with_fields(fields).build().unwrap() + }); + assert_eq!(manifest.metadata.partition_spec, { + let fields = vec![ + PartitionField { + name: "v_int".to_string(), + transform: Transform::Identity, + source_id: 2, + field_id: 1000, + }, + PartitionField { + name: "v_long".to_string(), + transform: Transform::Identity, + source_id: 3, + field_id: 1001, + }, + ]; + PartitionSpec { spec_id: 0, fields } + }); + assert!(manifest.metadata.content == ManifestContentType::Data); + assert!(manifest.metadata.format_version == FormatVersion::V2); + assert_eq!(manifest.entries.len(), 1); + let entry = &manifest.entries[0]; + assert_eq!(entry.status, ManifestStatus::Added); + assert_eq!(entry.snapshot_id, Some(0)); + assert_eq!(entry.sequence_number, Some(1)); + assert_eq!(entry.file_sequence_number, Some(1)); + assert_eq!(entry.data_file.content, DataContentType::Data); + assert_eq!( + entry.data_file.file_path, + "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet" + ); + assert_eq!(entry.data_file.file_format, DataFileFormat::Parquet); + assert_eq!( + entry.data_file.partition, + Struct::from_iter( + vec![ + (1000, Some(Literal::int(1)), "v_int".to_string()), + (1001, Some(Literal::long(1000)), "v_long".to_string()) + ] + .into_iter() + ) + ); + assert_eq!(entry.data_file.record_count, 1); + assert_eq!(entry.data_file.file_size_in_bytes, 5442); + assert_eq!( + entry.data_file.column_sizes, + Some(HashMap::from([ + (0, 73), + (6, 34), + (2, 73), + (7, 61), + (3, 61), + (5, 62), + (9, 79), + (10, 73), + (1, 61), + (4, 73), + (8, 73) + ])) + ); + assert_eq!( + entry.data_file.value_counts, + Some(HashMap::from([ + (4, 1), + (5, 1), + (2, 1), + (0, 1), + (3, 1), + (6, 1), + (8, 1), + (1, 1), + (10, 1), + (7, 1), + (9, 1) + ])) + ); + assert_eq!( + entry.data_file.null_value_counts, + Some(HashMap::from([ + (1, 0), + (6, 0), + (2, 0), + (8, 0), + (0, 0), + (3, 0), + (5, 0), + (9, 0), + (7, 0), + (4, 0), + (10, 0) + ])) + ); + assert_eq!(entry.data_file.nan_value_counts, None); + assert_eq!(entry.data_file.distinct_counts, None); + assert_eq!(entry.data_file.lower_bounds, None); + assert_eq!(entry.data_file.upper_bounds, None); + assert_eq!(entry.data_file.key_metadata, None); + assert_eq!(entry.data_file.split_offsets, Some(vec![4])); + assert_eq!(entry.data_file.equality_ids, None); + assert_eq!(entry.data_file.sort_order_id, None); + } + + #[test] + fn test_parse_manifest_v1_unpartition() { + let path = format!( + "{}/testdata/unpartition_manifest_v1.avro", + env!("CARGO_MANIFEST_DIR") + ); + let bs = fs::read(path).expect("read_file must succeed"); + let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + // test metadata + assert!(manifest.metadata.schema_id == 0); + assert_eq!(manifest.metadata.schema, { + let fields = vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "data", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 3, + "comment", + Type::Primitive(PrimitiveType::String), + )), + ]; + Schema::builder() + .with_schema_id(1) + .with_fields(fields) + .build() + .unwrap() + }); + assert!(manifest.metadata.partition_spec.fields.is_empty()); + assert!(manifest.metadata.content == ManifestContentType::Data); + assert!(manifest.metadata.format_version == FormatVersion::V1); + assert_eq!(manifest.entries.len(), 4); + let entry = &manifest.entries[0]; + assert!(entry.status == ManifestStatus::Added); + assert!(entry.snapshot_id == Some(2966623707104393227)); + assert!(entry.sequence_number.is_none()); + assert!(entry.file_sequence_number.is_none()); + assert_eq!( + entry.data_file, + DataFile { + content: DataContentType::Data, + file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count: 1, + file_size_in_bytes: 875, + column_sizes: Some(HashMap::from([(1,47),(2,48),(3,52)])), + value_counts: Some(HashMap::from([(1,1),(2,1),(3,1)])), + null_value_counts: Some(HashMap::from([(1,0),(2,0),(3,0)])), + nan_value_counts: Some(HashMap::new()), + distinct_counts: None, + lower_bounds: Some(HashMap::from([(1,Literal::int(1)),(2,Literal::string("a")),(3,Literal::string("AC/DC"))])), + upper_bounds: Some(HashMap::from([(1,Literal::int(1)),(2,Literal::string("a")),(3,Literal::string("AC/DC"))])), + key_metadata: None, + split_offsets: Some(vec![4]), + equality_ids: None, + sort_order_id: Some(0), + } + ); + } + + #[test] + fn test_parse_manifest_v1_partition() { + let path = format!( + "{}/testdata/partition_manifest_v1.avro", + env!("CARGO_MANIFEST_DIR") + ); + let bs = fs::read(path).expect("read_file must succeed"); + let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + // test metadata + assert!(manifest.metadata.schema_id == 0); + assert_eq!(manifest.metadata.schema, { + let fields = vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "data", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 3, + "category", + Type::Primitive(PrimitiveType::String), + )), + ]; + Schema::builder().with_fields(fields).build().unwrap() + }); + assert_eq!(manifest.metadata.partition_spec, { + let fields = vec![PartitionField { + name: "category".to_string(), + transform: Transform::Identity, + source_id: 3, + field_id: 1000, + }]; + PartitionSpec { spec_id: 0, fields } + }); + assert!(manifest.metadata.content == ManifestContentType::Data); + assert!(manifest.metadata.format_version == FormatVersion::V1); + + // test entries + assert!(manifest.entries.len() == 1); + let entry = &manifest.entries[0]; + assert!(entry.status == ManifestStatus::Added); + assert!(entry.snapshot_id == Some(8205833995881562618)); + assert!(entry.sequence_number.is_none()); + assert!(entry.file_sequence_number.is_none()); + assert_eq!(entry.data_file.content, DataContentType::Data); + assert_eq!( + entry.data_file.file_path, + "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet" + ); + assert_eq!(entry.data_file.file_format, DataFileFormat::Parquet); + assert_eq!( + entry.data_file.partition, + Struct::from_iter( + vec![( + 1000, + Some( + Literal::try_from_bytes(&[120], &Type::Primitive(PrimitiveType::String)) + .unwrap() + ), + "category".to_string() + )] + .into_iter() + ) + ); + assert_eq!(entry.data_file.record_count, 1); + assert_eq!(entry.data_file.file_size_in_bytes, 874); + assert_eq!( + entry.data_file.column_sizes, + Some(HashMap::from([(1, 46), (2, 48), (3, 48)])) + ); + assert_eq!( + entry.data_file.value_counts, + Some(HashMap::from([(1, 1), (2, 1), (3, 1)])) + ); + assert_eq!( + entry.data_file.null_value_counts, + Some(HashMap::from([(1, 0), (2, 0), (3, 0)])) + ); + assert_eq!(entry.data_file.nan_value_counts, Some(HashMap::new())); + assert_eq!(entry.data_file.distinct_counts, None); + assert_eq!( + entry.data_file.lower_bounds, + Some(HashMap::from([ + (1, Literal::long(1)), + (2, Literal::string("a")), + (3, Literal::string("x")) + ])) + ); + assert_eq!( + entry.data_file.upper_bounds, + Some(HashMap::from([ + (1, Literal::long(1)), + (2, Literal::string("a")), + (3, Literal::string("x")) + ])) + ); + assert_eq!(entry.data_file.key_metadata, None); + assert_eq!(entry.data_file.split_offsets, Some(vec![4])); + assert_eq!(entry.data_file.equality_ids, None); + assert_eq!(entry.data_file.sort_order_id, Some(0)); + } + + #[tokio::test] + async fn test_writer_manifest_v1_partition() { + // Read manifest + let path = format!( + "{}/testdata/partition_manifest_v1.avro", + env!("CARGO_MANIFEST_DIR") + ); + let bs = fs::read(path).expect("read_file must succeed"); + let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + + // Write manifest + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("manifest_list_v1.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + let writer = ManifestWriter::new(output_file, 1, 1, None); + let entry = writer.write(manifest.clone()).await.unwrap(); + + // Check partition summary + assert_eq!(entry.partitions.len(), 1); + assert_eq!(entry.partitions[0].lower_bound, Some(Literal::string("x"))); + assert_eq!(entry.partitions[0].upper_bound, Some(Literal::string("x"))); + + // Verify manifest + let bs = fs::read(path).expect("read_file must succeed"); + let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + + assert_eq!(actual_manifest, manifest); + } + + #[tokio::test] + async fn test_writer_manifest_v2_partition() { + // Read manifest + let path = format!( + "{}/testdata/partition_manifest_v2.avro", + env!("CARGO_MANIFEST_DIR") + ); + let bs = fs::read(path).expect("read_file must succeed"); + let manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + + // Write manifest + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("manifest_list_v2.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + let writer = ManifestWriter::new(output_file, 1, 1, None); + let _ = writer.write(manifest.clone()).await.unwrap(); + + // Verify manifest + let bs = fs::read(path).expect("read_file must succeed"); + let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); + + assert_eq!(actual_manifest, manifest); + } +} diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index f20fd9bf1..7790ac223 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -17,9 +17,9 @@ //! ManifestList for Iceberg. -use std::collections::HashMap; +use std::{collections::HashMap, str::FromStr}; -use crate::{io::OutputFile, spec::Literal, Error}; +use crate::{io::OutputFile, spec::Literal, Error, ErrorKind}; use apache_avro::{from_value, types::Value, Reader, Writer}; use futures::AsyncWriteExt; @@ -30,6 +30,9 @@ use self::{ use super::{FormatVersion, StructType}; +/// The seq number when no added files are present. +pub const UNASSIGNED_SEQ_NUMBER: i64 = -1; + /// Snapshots are embedded in table metadata, but the list of manifests for a /// snapshot are stored in a separate manifest list file. /// @@ -466,80 +469,80 @@ pub struct ManifestListEntry { /// field: 500 /// /// Location of the manifest file - manifest_path: String, + pub manifest_path: String, /// field: 501 /// /// Length of the manifest file in bytes - manifest_length: i64, + pub manifest_length: i64, /// field: 502 /// /// ID of a partition spec used to write the manifest; must be listed /// in table metadata partition-specs - partition_spec_id: i32, + pub partition_spec_id: i32, /// field: 517 /// /// The type of files tracked by the manifest, either data or delete /// files; 0 for all v1 manifests - content: ManifestContentType, + pub content: ManifestContentType, /// field: 515 /// /// The sequence number when the manifest was added to the table; use 0 /// when reading v1 manifest lists - sequence_number: i64, + pub sequence_number: i64, /// field: 516 /// /// The minimum data sequence number of all live data or delete files in /// the manifest; use 0 when reading v1 manifest lists - min_sequence_number: i64, + pub min_sequence_number: i64, /// field: 503 /// /// ID of the snapshot where the manifest file was added - added_snapshot_id: i64, + pub added_snapshot_id: i64, /// field: 504 /// /// Number of entries in the manifest that have status ADDED, when null /// this is assumed to be non-zero - added_data_files_count: Option, + pub added_data_files_count: Option, /// field: 505 /// /// Number of entries in the manifest that have status EXISTING (0), /// when null this is assumed to be non-zero - existing_data_files_count: Option, + pub existing_data_files_count: Option, /// field: 506 /// /// Number of entries in the manifest that have status DELETED (2), /// when null this is assumed to be non-zero - deleted_data_files_count: Option, + pub deleted_data_files_count: Option, /// field: 512 /// /// Number of rows in all of files in the manifest that have status /// ADDED, when null this is assumed to be non-zero - added_rows_count: Option, + pub added_rows_count: Option, /// field: 513 /// /// Number of rows in all of files in the manifest that have status /// EXISTING, when null this is assumed to be non-zero - existing_rows_count: Option, + pub existing_rows_count: Option, /// field: 514 /// /// Number of rows in all of files in the manifest that have status /// DELETED, when null this is assumed to be non-zero - deleted_rows_count: Option, + pub deleted_rows_count: Option, /// field: 507 /// element_field: 508 /// /// A list of field summaries for each partition field in the spec. Each /// field in the list corresponds to a field in the manifest file’s /// partition spec. - partitions: Vec, + pub partitions: Vec, /// field: 519 /// /// Implementation-specific key metadata for encryption - key_metadata: Vec, + pub key_metadata: Vec, } /// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Clone, Eq)] pub enum ManifestContentType { /// The manifest content is data. Data = 0, @@ -547,6 +550,30 @@ pub enum ManifestContentType { Deletes = 1, } +impl FromStr for ManifestContentType { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "data" => Ok(ManifestContentType::Data), + "deletes" => Ok(ManifestContentType::Deletes), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid manifest content type: {s}"), + )), + } + } +} + +impl ToString for ManifestContentType { + fn to_string(&self) -> String { + match self { + ManifestContentType::Data => "data".to_string(), + ManifestContentType::Deletes => "deletes".to_string(), + } + } +} + impl TryFrom for ManifestContentType { type Error = Error; @@ -568,25 +595,25 @@ impl TryFrom for ManifestContentType { /// Field summary for partition field in the spec. /// /// Each field in the list corresponds to a field in the manifest file’s partition spec. -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Default)] pub struct FieldSummary { /// field: 509 /// /// Whether the manifest contains at least one partition with a null /// value for the field - contains_null: bool, + pub contains_null: bool, /// field: 518 /// Whether the manifest contains at least one partition with a NaN /// value for the field - contains_nan: Option, + pub contains_nan: Option, /// field: 510 /// The minimum value for the field in the manifests /// partitions. - lower_bound: Option, + pub lower_bound: Option, /// field: 511 /// The maximum value for the field in the manifests /// partitions. - upper_bound: Option, + pub upper_bound: Option, } /// This is a helper module that defines types to help with serialization/deserialization. diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index 33d8bf958..199fc4a16 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -18,6 +18,7 @@ //! Spec for Iceberg. mod datatypes; +mod manifest; mod manifest_list; mod partition; mod schema; @@ -28,6 +29,7 @@ mod transform; mod values; pub use datatypes::*; +pub use manifest::*; pub use manifest_list::*; pub use partition::*; pub use schema::*; diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 484ec7e56..fcc7efae8 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -22,7 +22,9 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use typed_builder::TypedBuilder; -use super::transform::Transform; +use crate::{Error, ErrorKind}; + +use super::{transform::Transform, NestedField, Schema, StructType}; /// Reference to [`PartitionSpec`]. pub type PartitionSpecRef = Arc; @@ -69,6 +71,30 @@ impl PartitionSpec { .iter() .all(|f| matches!(f.transform, Transform::Void)) } + + /// Returns the partition type of this partition spec. + pub fn partition_type(&self, schema: &Schema) -> Result { + let mut fields = Vec::with_capacity(self.fields.len()); + for partition_field in &self.fields { + let field = schema + .field_by_id(partition_field.source_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "No column with source column id {} in schema {:?}", + partition_field.source_id, schema + ), + ) + })?; + let res_type = partition_field.transform.result_type(&field.field_type)?; + let field = + NestedField::optional(partition_field.field_id, &partition_field.name, res_type) + .into(); + fields.push(field); + } + Ok(StructType::new(fields)) + } } /// Reference to [`UnboundPartitionSpec`]. diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 3aa1f8b5b..724498b45 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -686,7 +686,6 @@ pub(super) mod _serde { impl TryFrom for Schema { type Error = Error; fn try_from(value: SchemaV2) -> Result { - dbg!(&value); Schema::builder() .with_schema_id(value.schema_id) .with_fields(value.fields.fields().iter().cloned()) diff --git a/crates/iceberg/src/spec/transform.rs b/crates/iceberg/src/spec/transform.rs index b66e059ed..839d582dc 100644 --- a/crates/iceberg/src/spec/transform.rs +++ b/crates/iceberg/src/spec/transform.rs @@ -126,17 +126,20 @@ pub enum Transform { impl Transform { /// Get the return type of transform given the input type. /// Returns `None` if it can't be transformed. - pub fn result_type(&self, input_type: &Type) -> Option { + pub fn result_type(&self, input_type: &Type) -> Result { match self { Transform::Identity => { if matches!(input_type, Type::Primitive(_)) { - Some(input_type.clone()) + Ok(input_type.clone()) } else { - None + Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of identity transform",), + )) } } - Transform::Void => Some(input_type.clone()), - Transform::Unknown => Some(Type::Primitive(PrimitiveType::String)), + Transform::Void => Ok(input_type.clone()), + Transform::Unknown => Ok(Type::Primitive(PrimitiveType::String)), Transform::Bucket(_) => { if let Type::Primitive(p) = input_type { match p { @@ -150,11 +153,17 @@ impl Transform { | PrimitiveType::String | PrimitiveType::Uuid | PrimitiveType::Fixed(_) - | PrimitiveType::Binary => Some(Type::Primitive(PrimitiveType::Int)), - _ => None, + | PrimitiveType::Binary => Ok(Type::Primitive(PrimitiveType::Int)), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of bucket transform",), + )), } } else { - None + Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of bucket transform",), + )) } } Transform::Truncate(_) => { @@ -164,11 +173,17 @@ impl Transform { | PrimitiveType::Long | PrimitiveType::String | PrimitiveType::Binary - | PrimitiveType::Decimal { .. } => Some(input_type.clone()), - _ => None, + | PrimitiveType::Decimal { .. } => Ok(input_type.clone()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of truncate transform",), + )), } } else { - None + Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of truncate transform",), + )) } } Transform::Year | Transform::Month | Transform::Day => { @@ -176,23 +191,35 @@ impl Transform { match p { PrimitiveType::Timestamp | PrimitiveType::Timestamptz - | PrimitiveType::Date => Some(Type::Primitive(PrimitiveType::Int)), - _ => None, + | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Int)), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of {self} transform",), + )), } } else { - None + Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of {self} transform",), + )) } } Transform::Hour => { if let Type::Primitive(p) = input_type { match p { PrimitiveType::Timestamp | PrimitiveType::Timestamptz => { - Some(Type::Primitive(PrimitiveType::Int)) + Ok(Type::Primitive(PrimitiveType::Int)) } - _ => None, + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of {self} transform",), + )), } } else { - None + Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of {self} transform",), + )) } } } @@ -367,7 +394,7 @@ mod tests { } for (input_type, result_type) in param.trans_types { - assert_eq!(result_type, trans.result_type(&input_type)); + assert_eq!(result_type, trans.result_type(&input_type).ok()); } } diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index bd9f2e848..de3da0497 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -34,7 +34,6 @@ use uuid::Uuid; use crate::{Error, ErrorKind}; use super::datatypes::{PrimitiveType, Type}; -use super::MAX_DECIMAL_PRECISION; pub use _serde::RawLiteral; @@ -609,6 +608,16 @@ pub struct Struct { } impl Struct { + /// Create a empty struct. + pub fn empty() -> Self { + Self { + fields: Vec::new(), + field_ids: Vec::new(), + field_names: Vec::new(), + null_bitmap: BitVec::new(), + } + } + /// Create a iterator to read the field in order of (field_id, field_value, field_name). pub fn iter(&self) -> impl Iterator, &str)> { self.null_bitmap @@ -906,34 +915,6 @@ impl Literal { } } - /// Get datatype of value - pub fn datatype(&self) -> Type { - match self { - Literal::Primitive(prim) => match prim { - PrimitiveLiteral::Boolean(_) => Type::Primitive(PrimitiveType::Boolean), - PrimitiveLiteral::Int(_) => Type::Primitive(PrimitiveType::Int), - PrimitiveLiteral::Long(_) => Type::Primitive(PrimitiveType::Long), - PrimitiveLiteral::Float(_) => Type::Primitive(PrimitiveType::Float), - PrimitiveLiteral::Double(_) => Type::Primitive(PrimitiveType::Double), - PrimitiveLiteral::Date(_) => Type::Primitive(PrimitiveType::Date), - PrimitiveLiteral::Time(_) => Type::Primitive(PrimitiveType::Time), - PrimitiveLiteral::Timestamp(_) => Type::Primitive(PrimitiveType::Timestamp), - PrimitiveLiteral::TimestampTZ(_) => Type::Primitive(PrimitiveType::Timestamptz), - PrimitiveLiteral::Fixed(vec) => { - Type::Primitive(PrimitiveType::Fixed(vec.len() as u64)) - } - PrimitiveLiteral::Binary(_) => Type::Primitive(PrimitiveType::Binary), - PrimitiveLiteral::String(_) => Type::Primitive(PrimitiveType::String), - PrimitiveLiteral::UUID(_) => Type::Primitive(PrimitiveType::Uuid), - PrimitiveLiteral::Decimal(_) => Type::Primitive(PrimitiveType::Decimal { - precision: MAX_DECIMAL_PRECISION, - scale: 0, - }), - }, - _ => unimplemented!(), - } - } - /// Convert Value to the any type pub fn into_any(self) -> Box { match self { diff --git a/crates/iceberg/testdata/partition_manifest_v1.avro b/crates/iceberg/testdata/partition_manifest_v1.avro new file mode 100644 index 0000000000000000000000000000000000000000..d1ada6188830738b625928ec1567eaef01534774 GIT binary patch literal 6067 zcmb_gO=uid9B&fvqVyou2CX;@^ibI7&P>{T;L#QewU#zwXbh7#vv0C5J3F(TkCa%V z7F4JpmYBnmMq^5qDiv=E(t|BM^xTUc1;MY2cR}#~-psuBb}}>BYy(RmGw=WP`+vOO ze_pqih7LXz+D+U>w??k0(Mkua(`pz6u^p*3HF+~e8tQbtraA<>jZmGQyQ0!3T21v( zo1TaNYHENl#U#M+!a;6`Ynd|F_ZHRFS|X!!nFfkbg%r3zBsqi2*(kz`ez4Mq?ELDp zXgTm-N}{;)GLJ97wjWTRQ~%q@BMuHD6MGTUEJd0g2_qE6A&8_95s%`-&L!~B2(z;u z>V!={G8rz3z?|yEt_vazj`^C}zz&MtNS&t7Wlfr-iGpN0#KqijX$*m$`IBTQ^@R$X z9TYXW2AW+8Ex6#@ki0WKKuUa1dE@+P6y*O%hIes`15$YVbQ69fmFGfHrm|QASAS#nl zhp6XQdvVYu9FkHpZ9irOkpjVzoB@0`Zd*7|d`C$RhYInOs0q*{iIpKXB-IS!ki3me z;+fV;gr)N9F)-st$Yt6TdYr1yj?22oCEb?m+by$Nxh#8JlC^y|ZhK5vc$C#Q&kqU$ zM4OaExD6G_pXYn8rD+r9vZhUD#nw~4ElhQ!sOEbPBqg_&ksFIXN&2iOZ`vfwr0Xvd z8@V&dsB5E+x=Jg6^}=hYL-UkGsE~#d`S7FYGGL)e8YFCiN9EmHpUnG|6vzHRH3d-V z^+BMT%DW|SO^Zp&x1JRJU{JAMs6iFlZrp%*pwJ1YAg`gEMQ5wAlbxheCQIr*0;p>g zz~VtsU2D2Lmv*f+T_32{7*b3*)L=4E$}3Hi=dp*peFw{!1h9;$0jkZ|(F9f1W=stT zRyJ3mt`vqEAFW9TR~DNg4-ePBj04jG%z@x!1LrYjntp=T*~9?Q=_kq?wP33269Zw# z;#F9@l_2Tz;Ey|<{RN3mGh>ixrptpO*Ng=q8H0^87H>Zw$<+>1#BAdTrTjKWITGji zeRI;YgOv_rUm&Msatd6}R%Kocc^ZWsmqe!TI3bSuuios2#4c_F?`14jGYoMVUZ83t zWC4j2y35CrA=9GJ55S|K0rgTug6U1pLAU^Cfeyl1<@BqSQIF@j7@e|*7{|B$2DT4^ zcP{J)M-tLGwv2-i$m6i4aw0hvcCZbx4%d$m8v`#}OxJ_ok8+CA-}(a!QO5VP-9W5B zut13oh(=4Bx_4;`d$t52+V}R9Ee1kVwz@MKcE3yOTU|uJsAjq)6sz0I*1%(}ixrVs z;QK;|MVu#wye(;1%0TVaA?4olshNWX|-ipRk83VJFG^cMptqNKAR1|2GIe`2U+ z<2)!$lzI{I-eY7sG=5oR(4&tV*$$e_!!V#M(y3g)i1!$U0fkf&g#oF1{3v$iuRJbY zbYG6D?lC6)L}^SqV3);Y5f8C1VeG+}FtTgs$jnl>FWNpwB8BF@T1GE#8AEEB!QGcR zA9vpn_ZZZJOz%qESGfbph^gdx!i>1S@|3Q`4xf%L3F&vV9GBw{V8oa8(k~ zuR@_82R3FiAFf&H>jnE}ci`}EfBZ9Se*g22pB#$sj=pei_|IY6`uEX0&AaQ z!oouHi`MNM)`m-5$JuUmmzHj|tZw&Ci>x=ly0O0ATq4&u*PVv-b?dEVH) X+L>$anP+}^f3(}3eD9;7d>H=&fS%yT literal 0 HcmV?d00001 diff --git a/crates/iceberg/testdata/partition_manifest_v2.avro b/crates/iceberg/testdata/partition_manifest_v2.avro new file mode 100644 index 0000000000000000000000000000000000000000..9d10a8711e98d44d1cc9249b8c01396c004f8882 GIT binary patch literal 3350 zcmcgvzmMZY6n2KyDq5|=YPH(x3YZF{_+mSoO*S>uR8*(cXkw3V&Dk?%W*n3pDJA6Tw}T zv+xJ>>?KyBkRfxE2v37PjuVzf_QCf&ua7y+7@p!eNw~&+tR>|t7BYi(NyIofTj2Zt zD_(|MUi^laRPg7mZf>6#&s4i}CKYf*!6ltb@l=I(ETezC{}Ss(ZrDf`3K@Z)HM@-R zBxXtzAS4&?-bSJD6vA7ZC{1;t%+{p~6OQ3sEpwWyJD~}rp|W{~d6A~@Xi4Apal~R; zq#94n`zDxuUN9aq!i#LiXG?qq30u%<4|8joR7z-fq5iSK6A^^)jsYlxt3^E_@ZXyUe%ujo8Y@ zBcUjoS+c-}Rawqt$>7kC*+R2M-)-B5A}umb6j*Ppsz~aLP${YD>qN5*+PZ$feGp#w z{+P^te*(6zm-huTK6W2~IZcaA!Bsjr6DD3ns}7PEmA_&Iztf=EskvULMu|>G~qi56j(j{qWIU z{~+$rg(PE|nn_XL1l47OOBZ_%yvox=6A{OX>4(>?x=Y&UCk#Fco)<8D-@vV%P5ami z31npi)!H@GcMb`2;oakUG+B`ZVhDM?^=Ryx)LmFr5{ih0U2~J1mICG96Ochxs#yI5 z;;C^38+o-6LY9z(xD&}>A*QKf_03$52!jDhwk5&ZVAZTPD3D+^%DU@=W_rMGR}Ea8 z3TmW48?anAW6$h3Y9J#~%&ZD&gRTuK>~#Y>r!u^wMnkoM8;ZkhJZ^wzLKwZ)hBV8J zaw}F0bkcxAUCsDwgWBba?@a@zlMI9>&GJ3q+D0$yz5S-PhDJ&_E=| literal 0 HcmV?d00001 diff --git a/crates/iceberg/testdata/unpartition_manifest_v1.avro b/crates/iceberg/testdata/unpartition_manifest_v1.avro new file mode 100644 index 0000000000000000000000000000000000000000..a4ef37646fbd6bc26f51c4149ea5891a1f0f366a GIT binary patch literal 6032 zcmb_gd2AF_9EJj+CWa#%A+aRGgali+b!Vou+kyzv5-zKyEk#S&%+AizX=i7)GqWwF ztO$mXXyq`rh>cpJLBooXIwjNv_(X^l-3O`Dj2kfHowW2f^;e4p+k}=2X(r% zuAR1?D0~>5hyOgZCN@VUO@yN%Ue-mA?MzY(I_5D=SPyNP1bKr`V+0vU%0|bIbL<2) z92UW2>Ug$keQYM*qNxQYhOPn!ohAyZhBCHN3G<2+5_Q85ezi7u?!svu(GA{+>hP6C z5W-}g!{m4oK>Mu1NAw2O@MBg?gF0G?$}*V9Dxj+O&_OZ8M`eR9MbF)y7?&r+kv}BK zq7yhJ2934zG3#UHf{gtU-e_<IH+w(GrO_^sXlujvy%Hg3@O+bIB2(L9q#dsbWuNyy{FeH?m z6C*W##v8{#eJnnC)) z#=Pj#X3;Mx{y>`{x_TxqK(%V{GWJHH4I;8TN#-sk13^5FCL# zx-q^Os-&!LlmuR`Hn&rlkI*qRL%4yT<|C+1CPGRIQicj2#4aWlIy)%oO^{LQk@NZz zb26J_bf_u;s#tF(RFx#AMR>KE$yIOGI|8ckEO?*`1vwgoA*Rq;qIe}pRqz$*xpOCp zto*pBqevjjAOhp2+u610dyRE3lLz)1%VtC%W(qrp9yB*f>P}NQ74P@0a;v0 z2HnQpHw5XrjVsBJEPJm)Us0GfXSJFWoVM9G_wta{R#Ec@fOF6!_QDy#PGgHupA~07 z9b1%oP?KzRwm2j1FkTAdO-mB%Ui{HWWOPYF+l2 z_8U@^MI?evVa9RfWmtvW8hikh7xA5X9dT}1q^lZaqy?d0k||dtc^wj};4El3I7_W4 zON+YDtRs0*p)ETd3v)pYioztwPUa0U$5e{>EuyA_;>%)aNB!mN5m5lFS>7B;5J8u1 zv^4#|Tt}iLyRmpJDM4;5@WdABwxB1s7P#{yzqG*3$F_=-_QW*)!h_$<0S~N)U>31a(Ba#`gXb?z#X(Wf+O;W7Gv2EML&s02yzMz4V64v z8iML$#3i{cCP@9{UhSze^<;YKPl6Q2^OUH|hY~8~C4fYk??i$9l0^Qe3q!p494`qM z0b(B4!Vb?!YI|tdbLOgHit#__#*eQ~c#T_MK@4-Nm)d-q-;xlh)j`Y;OSTgXB@PDc zmk3M}U}zY?5t5BbFt9LPqk|#6o_0VnWSo;p=V2maVRTa=gDD#+vipRt1A~LMur_28 zO*>2!95hl26daUp2iu7qG0dvKqRca&*c>yJgvJ`Qg z?I=q+`uQqBjX4is#aP#L`vh?qSJ5P@%}KA2tcXN`M!dyU(b~->2k}A;q!)a0oEI%t zR^#jH$Mmg@bU*IDNN3GBHD&MZI|JF5CLF)KpyHf&*2G9(S>=wh9kbqO-?sbeu6y2T zv5%KdcbVU>^{1y@6Lyfb2@&z*MFSKJ-_(%1HFR}WglL8Yeh9x zK1W$mq)-E|y(_A^zd+UX# zmgH4z`yy}8_PpG)dGhIw+0}vKm7Ogo7M#1%v8Lq6um2tVdGO|;MbBovJoov3Pmfu3 z$TR84D^2q%$CNF9YYJVy{;Rch?{pn~?%=n%HNDsGT)K8z`gCV&tnP`ysawY-^8P<6 COY>d; literal 0 HcmV?d00001 diff --git a/crates/iceberg/testdata/unpartition_manifest_v2.avro b/crates/iceberg/testdata/unpartition_manifest_v2.avro new file mode 100644 index 0000000000000000000000000000000000000000..7652159f3780b804831825a1c48ee040c68a4dcf GIT binary patch literal 3095 zcmcgu%Wm5^6qVUDFoM7c(jco2T5#0@QEbVLlek@WKdYiZgMgqUs$naUDoMGBGuibE zyy|McV;23Ke!*Z?vuQ6SOI98-QpBhhi7DQ5d31TmbN>uJbb7r|3d4lyM%2(!|itrv5G_~WD3LZ0U2;kH?ypK6cB8|r<7gdNh)Z`kb<|()^ z_;PPCEoi3Cq#@jZId3QDWrU)SbG&pRX&&pU$Hn-%}aY>wMCIiX3u*vt7R!M86_VgV^@u^Q^f zK;0kw4(j$ykyZ2eJWBAM{hhdj3Q5WgwU_sey5lm%k^`@^gc~B`STl3?eXH)0 zc1r^b(a9uKZu%=V2}cNPUr1=Zk+!9+|G&8pL} zFd2*SYs>^Z##&JutTI)L8a7ac54FQ;o3gkU4P2ba!fM;rE9b4jS>!F=kq??SVYzZRUi4w0KIFQJ+UV8;}#)YynZnr*vB2X0X-PSK}_A)k7M`6AUt-1z&~^kMh7$+MWG+CBew|Nhm!;?uIbJH zqc3mJt~2XwzFwPkdeimk#$VficXnnSH0{nhTeFUXcDrZ;tpTfbw28KW+79ZWZ5Zz& eXVY18wj9SrbP#6AQ|Gb|P)9Fb7 literal 0 HcmV?d00001 From a72cecd3c0448048de3e4e78e38a2499fced6d3b Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 12 Dec 2023 14:48:44 +0800 Subject: [PATCH 2/7] refine in memory struct --- crates/iceberg/src/spec/manifest.rs | 160 +++++++++--------- crates/iceberg/src/spec/manifest_list.rs | 198 ++++++++++++++--------- 2 files changed, 207 insertions(+), 151 deletions(-) diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 2a5d58728..5540d32de 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -21,7 +21,7 @@ use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; use super::Literal; use super::{ FieldSummary, FormatVersion, ManifestContentType, ManifestListEntry, PartitionSpec, Schema, - Struct, UNASSIGNED_SEQ_NUMBER, + Struct, }; use crate::io::OutputFile; use crate::spec::PartitionField; @@ -87,12 +87,12 @@ pub struct ManifestWriter { snapshot_id: i64, - added_files: i32, - added_rows: i64, - existing_files: i32, - existing_rows: i64, - deleted_files: i32, - deleted_rows: i64, + added_files: u32, + added_rows: u64, + existing_files: u32, + existing_rows: u64, + deleted_files: u32, + deleted_rows: u64, seq_num: i64, min_seq_num: Option, @@ -139,7 +139,6 @@ impl ManifestWriter { if let Some(nan) = &entry.data_file.nan_value_counts { for (&k, &v) in nan { let field_summary = self.field_summary.entry(k).or_default(); - assert!(v >= 0); if v > 0 { field_summary.contains_nan = Some(true); } @@ -187,7 +186,7 @@ impl ManifestWriter { } /// Write a manifest entry. - pub async fn write(mut self, manifest: Manifest) -> Result { + pub async fn write(mut self, manifest: Manifest) -> Result, Error> { // Create the avro writer let partition_type = manifest .metadata @@ -289,23 +288,28 @@ impl ManifestWriter { let partition_summary = self.get_field_summary_vec(&manifest.metadata.partition_spec.fields); - Ok(ManifestListEntry { - manifest_path: self.output.location().to_string(), - manifest_length: length as i64, - partition_spec_id: manifest.metadata.partition_spec.spec_id, - content: manifest.metadata.content, - sequence_number: self.seq_num, - min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQ_NUMBER), - added_snapshot_id: self.snapshot_id, - added_data_files_count: Some(self.added_files), - existing_data_files_count: Some(self.existing_files), - deleted_data_files_count: Some(self.deleted_files), - added_rows_count: Some(self.added_rows), - existing_rows_count: Some(self.existing_rows), - deleted_rows_count: Some(self.deleted_rows), - partitions: partition_summary, - key_metadata: self.key_metadata.unwrap_or_default(), - }) + if let Some(min_sequence_number) = self.min_seq_num { + Ok(Some(ManifestListEntry { + manifest_path: self.output.location().to_string(), + manifest_length: length as i64, + partition_spec_id: manifest.metadata.partition_spec.spec_id, + content: manifest.metadata.content, + sequence_number: self.seq_num, + min_sequence_number, + added_snapshot_id: self.snapshot_id, + added_data_files_count: Some(self.added_files), + existing_data_files_count: Some(self.existing_files), + deleted_data_files_count: Some(self.deleted_files), + added_rows_count: Some(self.added_rows), + existing_rows_count: Some(self.existing_rows), + deleted_rows_count: Some(self.deleted_rows), + partitions: partition_summary, + key_metadata: self.key_metadata.unwrap_or_default(), + })) + } else { + // All entries are deleted + Ok(None) + } } } @@ -324,7 +328,7 @@ mod _const_schema { Error, }; - pub static STATUS: Lazy = { + static STATUS: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 0, @@ -334,7 +338,7 @@ mod _const_schema { }) }; - pub static SNAPSHOT_ID_V1: Lazy = { + static SNAPSHOT_ID_V1: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 1, @@ -344,7 +348,7 @@ mod _const_schema { }) }; - pub static SNAPSHOT_ID_V2: Lazy = { + static SNAPSHOT_ID_V2: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 1, @@ -354,7 +358,7 @@ mod _const_schema { }) }; - pub static SEQUENCE_NUMBER: Lazy = { + static SEQUENCE_NUMBER: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 3, @@ -364,7 +368,7 @@ mod _const_schema { }) }; - pub static FILE_SEQUENCE_NUMBER: Lazy = { + static FILE_SEQUENCE_NUMBER: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 4, @@ -374,7 +378,7 @@ mod _const_schema { }) }; - pub static CONTENT: Lazy = { + static CONTENT: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 134, @@ -384,7 +388,7 @@ mod _const_schema { }) }; - pub static FILE_PATH: Lazy = { + static FILE_PATH: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 100, @@ -394,7 +398,7 @@ mod _const_schema { }) }; - pub static FILE_FORMAT: Lazy = { + static FILE_FORMAT: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 101, @@ -404,7 +408,7 @@ mod _const_schema { }) }; - pub static RECORD_COUNT: Lazy = { + static RECORD_COUNT: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 103, @@ -414,7 +418,7 @@ mod _const_schema { }) }; - pub static FILE_SIZE_IN_BYTES: Lazy = { + static FILE_SIZE_IN_BYTES: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 104, @@ -425,7 +429,7 @@ mod _const_schema { }; // Deprecated. Always write a default in v1. Do not write in v2. - pub static BLOCK_SIZE_IN_BYTES: Lazy = { + static BLOCK_SIZE_IN_BYTES: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 105, @@ -435,7 +439,7 @@ mod _const_schema { }) }; - pub static COLUMN_SIZES: Lazy = { + static COLUMN_SIZES: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 108, @@ -456,7 +460,7 @@ mod _const_schema { }) }; - pub static VALUE_COUNTS: Lazy = { + static VALUE_COUNTS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 109, @@ -477,7 +481,7 @@ mod _const_schema { }) }; - pub static NULL_VALUE_COUNTS: Lazy = { + static NULL_VALUE_COUNTS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 110, @@ -498,7 +502,7 @@ mod _const_schema { }) }; - pub static NAN_VALUE_COUNTS: Lazy = { + static NAN_VALUE_COUNTS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 137, @@ -519,7 +523,7 @@ mod _const_schema { }) }; - pub static DISTINCT_COUNTS: Lazy = { + static DISTINCT_COUNTS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 111, @@ -540,7 +544,7 @@ mod _const_schema { }) }; - pub static LOWER_BOUNDS: Lazy = { + static LOWER_BOUNDS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 125, @@ -561,7 +565,7 @@ mod _const_schema { }) }; - pub static UPPER_BOUNDS: Lazy = { + static UPPER_BOUNDS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 128, @@ -582,7 +586,7 @@ mod _const_schema { }) }; - pub static KEY_METADATA: Lazy = { + static KEY_METADATA: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 131, @@ -592,7 +596,7 @@ mod _const_schema { }) }; - pub static SPLIT_OFFSETS: Lazy = { + static SPLIT_OFFSETS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 132, @@ -608,7 +612,7 @@ mod _const_schema { }) }; - pub static EQUALITY_IDS: Lazy = { + static EQUALITY_IDS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 135, @@ -624,7 +628,7 @@ mod _const_schema { }) }; - pub static SORT_ORDER_ID: Lazy = { + static SORT_ORDER_ID: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 140, @@ -906,11 +910,11 @@ pub struct DataFile { /// field id: 103 /// /// Number of records in this file - record_count: i64, + record_count: u64, /// field id: 104 /// /// Total file size in bytes - file_size_in_bytes: i64, + file_size_in_bytes: u64, /// field id: 108 /// key field id: 117 /// value field id: 118 @@ -918,26 +922,26 @@ pub struct DataFile { /// Map from column id to the total size on disk of all regions that /// store the column. Does not include bytes necessary to read other /// columns, like footers. Leave null for row-oriented formats (Avro) - column_sizes: Option>, + column_sizes: Option>, /// field id: 109 /// key field id: 119 /// value field id: 120 /// /// Map from column id to number of values in the column (including null /// and NaN values) - value_counts: Option>, + value_counts: Option>, /// field id: 110 /// key field id: 121 /// value field id: 122 /// /// Map from column id to number of null values in the column - null_value_counts: Option>, + null_value_counts: Option>, /// field id: 137 /// key field id: 138 /// value field id: 139 /// /// Map from column id to number of NaN values in the column - nan_value_counts: Option>, + nan_value_counts: Option>, /// field id: 111 /// key field id: 123 /// value field id: 124 @@ -946,7 +950,7 @@ pub struct DataFile { /// distinct counts must be derived using values in the file by counting /// or using sketches, but not using methods like merging existing /// distinct counts - distinct_counts: Option>, + distinct_counts: Option>, /// field id: 125 /// key field id: 126 /// value field id: 127 @@ -1193,14 +1197,14 @@ mod _serde { Literal::Struct(value.partition), &Type::Struct(partition_type.clone()), )?, - record_count: value.record_count, - file_size_in_bytes: value.file_size_in_bytes, + record_count: value.record_count.try_into()?, + file_size_in_bytes: value.file_size_in_bytes.try_into()?, block_size_in_bytes, - column_sizes: value.column_sizes.map(to_i64_entry), - value_counts: value.value_counts.map(to_i64_entry), - null_value_counts: value.null_value_counts.map(to_i64_entry), - nan_value_counts: value.nan_value_counts.map(to_i64_entry), - distinct_counts: value.distinct_counts.map(to_i64_entry), + column_sizes: value.column_sizes.map(to_i64_entry).transpose()?, + value_counts: value.value_counts.map(to_i64_entry).transpose()?, + null_value_counts: value.null_value_counts.map(to_i64_entry).transpose()?, + nan_value_counts: value.nan_value_counts.map(to_i64_entry).transpose()?, + distinct_counts: value.distinct_counts.map(to_i64_entry).transpose()?, lower_bounds: value.lower_bounds.map(to_bytes_entry), upper_bounds: value.upper_bounds.map(to_bytes_entry), key_metadata: value.key_metadata.map(serde_bytes::ByteBuf::from), @@ -1234,13 +1238,13 @@ mod _serde { file_path: self.file_path, file_format: self.file_format.parse()?, partition, - record_count: self.record_count, - file_size_in_bytes: self.file_size_in_bytes, - column_sizes: self.column_sizes.map(parse_i64_entry), - value_counts: self.value_counts.map(parse_i64_entry), - null_value_counts: self.null_value_counts.map(parse_i64_entry), - nan_value_counts: self.nan_value_counts.map(parse_i64_entry), - distinct_counts: self.distinct_counts.map(parse_i64_entry), + record_count: self.record_count.try_into()?, + file_size_in_bytes: self.file_size_in_bytes.try_into()?, + column_sizes: self.column_sizes.map(parse_i64_entry).transpose()?, + value_counts: self.value_counts.map(parse_i64_entry).transpose()?, + null_value_counts: self.null_value_counts.map(parse_i64_entry).transpose()?, + nan_value_counts: self.nan_value_counts.map(parse_i64_entry).transpose()?, + distinct_counts: self.distinct_counts.map(parse_i64_entry).transpose()?, lower_bounds: self .lower_bounds .map(|v| parse_bytes_entry(v, schema)) @@ -1301,20 +1305,22 @@ mod _serde { value: i64, } - fn parse_i64_entry(v: Vec) -> HashMap { + fn parse_i64_entry(v: Vec) -> Result, Error> { let mut m = HashMap::with_capacity(v.len()); for entry in v { - m.insert(entry.key, entry.value); + m.insert(entry.key, entry.value.try_into()?); } - m + Ok(m) } - fn to_i64_entry(entries: HashMap) -> Vec { + fn to_i64_entry(entries: HashMap) -> Result, Error> { entries .iter() - .map(|e| I64Entry { - key: *e.0, - value: *e.1, + .map(|e| { + Ok(I64Entry { + key: *e.0, + value: (*e.1).try_into()?, + }) }) .collect() } @@ -1806,7 +1812,7 @@ mod tests { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); let writer = ManifestWriter::new(output_file, 1, 1, None); - let entry = writer.write(manifest.clone()).await.unwrap(); + let entry = writer.write(manifest.clone()).await.unwrap().unwrap(); // Check partition summary assert_eq!(entry.partitions.len(), 1); diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 7790ac223..e9d5b80f7 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -30,9 +30,6 @@ use self::{ use super::{FormatVersion, StructType}; -/// The seq number when no added files are present. -pub const UNASSIGNED_SEQ_NUMBER: i64 = -1; - /// Snapshots are embedded in table metadata, but the list of manifests for a /// snapshot are stored in a separate manifest list file. /// @@ -159,7 +156,7 @@ impl ManifestListWriter { match self.format_version { FormatVersion::V1 => { for manifest_entry in manifest_entries { - let manifest_entry: ManifestListEntryV1 = manifest_entry.into(); + let manifest_entry: ManifestListEntryV1 = manifest_entry.try_into()?; self.avro_writer.append_ser(manifest_entry)?; } } @@ -502,32 +499,32 @@ pub struct ManifestListEntry { /// /// Number of entries in the manifest that have status ADDED, when null /// this is assumed to be non-zero - pub added_data_files_count: Option, + pub added_data_files_count: Option, /// field: 505 /// /// Number of entries in the manifest that have status EXISTING (0), /// when null this is assumed to be non-zero - pub existing_data_files_count: Option, + pub existing_data_files_count: Option, /// field: 506 /// /// Number of entries in the manifest that have status DELETED (2), /// when null this is assumed to be non-zero - pub deleted_data_files_count: Option, + pub deleted_data_files_count: Option, /// field: 512 /// /// Number of rows in all of files in the manifest that have status /// ADDED, when null this is assumed to be non-zero - pub added_rows_count: Option, + pub added_rows_count: Option, /// field: 513 /// /// Number of rows in all of files in the manifest that have status /// EXISTING, when null this is assumed to be non-zero - pub existing_rows_count: Option, + pub existing_rows_count: Option, /// field: 514 /// /// Number of rows in all of files in the manifest that have status /// DELETED, when null this is assumed to be non-zero - pub deleted_rows_count: Option, + pub deleted_rows_count: Option, /// field: 507 /// element_field: 508 /// @@ -685,11 +682,17 @@ pub(super) mod _serde { } } - impl From for ManifestListV1 { - fn from(value: super::ManifestList) -> Self { - Self { - entries: value.entries.into_iter().map(Into::into).collect(), - } + impl TryFrom for ManifestListV1 { + type Error = Error; + + fn try_from(value: super::ManifestList) -> Result { + Ok(Self { + entries: value + .entries + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?, + }) } } @@ -796,12 +799,12 @@ pub(super) mod _serde { sequence_number: self.sequence_number, min_sequence_number: self.min_sequence_number, added_snapshot_id: self.added_snapshot_id, - added_data_files_count: Some(self.added_data_files_count), - existing_data_files_count: Some(self.existing_data_files_count), - deleted_data_files_count: Some(self.deleted_data_files_count), - added_rows_count: Some(self.added_rows_count), - existing_rows_count: Some(self.existing_rows_count), - deleted_rows_count: Some(self.deleted_rows_count), + added_data_files_count: Some(self.added_data_files_count.try_into()?), + existing_data_files_count: Some(self.existing_data_files_count.try_into()?), + deleted_data_files_count: Some(self.deleted_data_files_count.try_into()?), + added_rows_count: Some(self.added_rows_count.try_into()?), + existing_rows_count: Some(self.existing_rows_count.try_into()?), + deleted_rows_count: Some(self.deleted_rows_count.try_into()?), partitions, key_metadata: self.key_metadata.map(|b| b.into_vec()).unwrap_or_default(), }) @@ -818,12 +821,24 @@ pub(super) mod _serde { manifest_length: self.manifest_length, partition_spec_id: self.partition_spec_id, added_snapshot_id: self.added_snapshot_id, - added_data_files_count: self.added_data_files_count, - existing_data_files_count: self.existing_data_files_count, - deleted_data_files_count: self.deleted_data_files_count, - added_rows_count: self.added_rows_count, - existing_rows_count: self.existing_rows_count, - deleted_rows_count: self.deleted_rows_count, + added_data_files_count: self + .added_data_files_count + .map(TryInto::try_into) + .transpose()?, + existing_data_files_count: self + .existing_data_files_count + .map(TryInto::try_into) + .transpose()?, + deleted_data_files_count: self + .deleted_data_files_count + .map(TryInto::try_into) + .transpose()?, + added_rows_count: self.added_rows_count.map(TryInto::try_into).transpose()?, + existing_rows_count: self + .existing_rows_count + .map(TryInto::try_into) + .transpose()?, + deleted_rows_count: self.deleted_rows_count.map(TryInto::try_into).transpose()?, partitions, key_metadata: self.key_metadata.map(|b| b.into_vec()).unwrap_or_default(), // as ref: https://iceberg.apache.org/spec/#partitioning @@ -877,66 +892,101 @@ pub(super) mod _serde { sequence_number: value.sequence_number, min_sequence_number: value.min_sequence_number, added_snapshot_id: value.added_snapshot_id, - added_data_files_count: value.added_data_files_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "added_data_files_count in ManifestListEntryV2 should be require", - ) - })?, - existing_data_files_count: value.existing_data_files_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "existing_data_files_count in ManifestListEntryV2 should be require", - ) - })?, - deleted_data_files_count: value.deleted_data_files_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "deleted_data_files_count in ManifestListEntryV2 should be require", - ) - })?, - added_rows_count: value.added_rows_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "added_rows_count in ManifestListEntryV2 should be require", - ) - })?, - existing_rows_count: value.existing_rows_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "existing_rows_count in ManifestListEntryV2 should be require", - ) - })?, - deleted_rows_count: value.deleted_rows_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "deleted_rows_count in ManifestListEntryV2 should be require", - ) - })?, + added_data_files_count: value + .added_data_files_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "added_data_files_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + existing_data_files_count: value + .existing_data_files_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "existing_data_files_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + deleted_data_files_count: value + .deleted_data_files_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "deleted_data_files_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + added_rows_count: value + .added_rows_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "added_rows_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + existing_rows_count: value + .existing_rows_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "existing_rows_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + deleted_rows_count: value + .deleted_rows_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "deleted_rows_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, partitions, key_metadata, }) } } - impl From for ManifestListEntryV1 { - fn from(value: ManifestListEntry) -> Self { + impl TryFrom for ManifestListEntryV1 { + type Error = Error; + + fn try_from(value: ManifestListEntry) -> Result { let partitions = convert_to_serde_field_summary(value.partitions); let key_metadata = convert_to_serde_key_metadata(value.key_metadata); - Self { + Ok(Self { manifest_path: value.manifest_path, manifest_length: value.manifest_length, partition_spec_id: value.partition_spec_id, added_snapshot_id: value.added_snapshot_id, - added_data_files_count: value.added_data_files_count, - existing_data_files_count: value.existing_data_files_count, - deleted_data_files_count: value.deleted_data_files_count, - added_rows_count: value.added_rows_count, - existing_rows_count: value.existing_rows_count, - deleted_rows_count: value.deleted_rows_count, + added_data_files_count: value + .added_data_files_count + .map(TryInto::try_into) + .transpose()?, + existing_data_files_count: value + .existing_data_files_count + .map(TryInto::try_into) + .transpose()?, + deleted_data_files_count: value + .deleted_data_files_count + .map(TryInto::try_into) + .transpose()?, + added_rows_count: value.added_rows_count.map(TryInto::try_into).transpose()?, + existing_rows_count: value + .existing_rows_count + .map(TryInto::try_into) + .transpose()?, + deleted_rows_count: value + .deleted_rows_count + .map(TryInto::try_into) + .transpose()?, partitions, key_metadata, - } + }) } } } @@ -1060,7 +1110,7 @@ mod test { partitions: vec![], key_metadata: vec![], }] - }.into(); + }.try_into().unwrap(); let result = serde_json::to_string(&manifest_list).unwrap(); assert_eq!( result, From 007079379a5f3d87a59d076c26ab15a84ade6e6b Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 13 Dec 2023 13:16:26 +0800 Subject: [PATCH 3/7] fix seq num assign --- crates/iceberg/src/spec/manifest.rs | 86 +++++++++++------------- crates/iceberg/src/spec/manifest_list.rs | 65 +++++++++++++++--- 2 files changed, 94 insertions(+), 57 deletions(-) diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 5540d32de..623f417a2 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -18,11 +18,11 @@ //! Manifest for Iceberg. use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; -use super::Literal; use super::{ FieldSummary, FormatVersion, ManifestContentType, ManifestListEntry, PartitionSpec, Schema, Struct, }; +use super::{Literal, UNASSIGNED_SEQUENCE_NUMBER}; use crate::io::OutputFile; use crate::spec::PartitionField; use crate::{Error, ErrorKind}; @@ -94,7 +94,6 @@ pub struct ManifestWriter { deleted_files: u32, deleted_rows: u64, - seq_num: i64, min_seq_num: Option, key_metadata: Option>, @@ -104,12 +103,7 @@ pub struct ManifestWriter { impl ManifestWriter { /// Create a new manifest writer. - pub fn new( - output: OutputFile, - snapshot_id: i64, - seq_num: i64, - key_metadata: Option>, - ) -> Self { + pub fn new(output: OutputFile, snapshot_id: i64, key_metadata: Option>) -> Self { Self { output, snapshot_id, @@ -119,7 +113,6 @@ impl ManifestWriter { existing_rows: 0, deleted_files: 0, deleted_rows: 0, - seq_num, min_seq_num: None, key_metadata, field_summary: HashMap::new(), @@ -186,7 +179,7 @@ impl ManifestWriter { } /// Write a manifest entry. - pub async fn write(mut self, manifest: Manifest) -> Result, Error> { + pub async fn write(mut self, manifest: Manifest) -> Result { // Create the avro writer let partition_type = manifest .metadata @@ -222,7 +215,7 @@ impl ManifestWriter { )?; avro_writer.add_user_metadata( "format-version".to_string(), - manifest.metadata.format_version.to_string(), + (manifest.metadata.format_version as u8).to_string(), )?; if manifest.metadata.format_version == FormatVersion::V2 { avro_writer @@ -231,6 +224,15 @@ impl ManifestWriter { // Write manifest entries for entry in manifest.entries { + if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing) + && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none()) + { + return Err(Error::new( + ErrorKind::DataInvalid, + "Manifest entry with status Existing or Deleted should have sequence number", + )); + } + match entry.status { ManifestStatus::Added => { self.added_files += 1; @@ -247,15 +249,8 @@ impl ManifestWriter { } if entry.is_alive() { - if let Some(cur_min_seq_num) = self.min_seq_num { - self.min_seq_num = Some( - entry - .sequence_number - .map(|v| min(v, cur_min_seq_num)) - .unwrap_or(cur_min_seq_num), - ); - } else { - self.min_seq_num = entry.sequence_number; + if let Some(seq_num) = entry.sequence_number { + self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num))); } } @@ -288,28 +283,25 @@ impl ManifestWriter { let partition_summary = self.get_field_summary_vec(&manifest.metadata.partition_spec.fields); - if let Some(min_sequence_number) = self.min_seq_num { - Ok(Some(ManifestListEntry { - manifest_path: self.output.location().to_string(), - manifest_length: length as i64, - partition_spec_id: manifest.metadata.partition_spec.spec_id, - content: manifest.metadata.content, - sequence_number: self.seq_num, - min_sequence_number, - added_snapshot_id: self.snapshot_id, - added_data_files_count: Some(self.added_files), - existing_data_files_count: Some(self.existing_files), - deleted_data_files_count: Some(self.deleted_files), - added_rows_count: Some(self.added_rows), - existing_rows_count: Some(self.existing_rows), - deleted_rows_count: Some(self.deleted_rows), - partitions: partition_summary, - key_metadata: self.key_metadata.unwrap_or_default(), - })) - } else { - // All entries are deleted - Ok(None) - } + Ok(ManifestListEntry { + manifest_path: self.output.location().to_string(), + manifest_length: length as i64, + partition_spec_id: manifest.metadata.partition_spec.spec_id, + content: manifest.metadata.content, + // sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with + // real sequence number in `ManifestListWriter`. + sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQUENCE_NUMBER), + added_snapshot_id: self.snapshot_id, + added_data_files_count: Some(self.added_files), + existing_data_files_count: Some(self.existing_files), + deleted_data_files_count: Some(self.deleted_files), + added_rows_count: Some(self.added_rows), + existing_rows_count: Some(self.existing_rows), + deleted_rows_count: Some(self.deleted_rows), + partitions: partition_summary, + key_metadata: self.key_metadata.unwrap_or_default(), + }) } } @@ -1811,8 +1803,8 @@ mod tests { let path = temp_dir.path().join("manifest_list_v1.avro"); let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let writer = ManifestWriter::new(output_file, 1, 1, None); - let entry = writer.write(manifest.clone()).await.unwrap().unwrap(); + let writer = ManifestWriter::new(output_file, 1, None); + let entry = writer.write(manifest.clone()).await.unwrap(); // Check partition summary assert_eq!(entry.partitions.len(), 1); @@ -1841,8 +1833,10 @@ mod tests { let path = temp_dir.path().join("manifest_list_v2.avro"); let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let writer = ManifestWriter::new(output_file, 1, 1, None); - let _ = writer.write(manifest.clone()).await.unwrap(); + let writer = ManifestWriter::new(output_file, 1, None); + let res = writer.write(manifest.clone()).await.unwrap(); + assert_eq!(res.sequence_number, UNASSIGNED_SEQUENCE_NUMBER); + assert_eq!(res.min_sequence_number, 1); // Verify manifest let bs = fs::read(path).expect("read_file must succeed"); diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index e9d5b80f7..db0c30c2c 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -30,6 +30,9 @@ use self::{ use super::{FormatVersion, StructType}; +/// Placeholder for sequence number. The field with this value must be replaced with the actual sequence number before it write. +pub const UNASSIGNED_SEQUENCE_NUMBER: i64 = -1; + /// Snapshots are embedded in table metadata, but the list of manifests for a /// snapshot are stored in a separate manifest list file. /// @@ -81,6 +84,8 @@ pub struct ManifestListWriter { format_version: FormatVersion, output_file: OutputFile, avro_writer: Writer<'static, Vec>, + sequence_number: i64, + snapshot_id: i64, } impl std::fmt::Debug for ManifestListWriter { @@ -104,7 +109,7 @@ impl ManifestListWriter { ), ("format-version".to_string(), "1".to_string()), ]); - Self::new(FormatVersion::V1, output_file, metadata) + Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id) } /// Construct a v2 [`ManifestListWriter`] that writes to a provided [`OutputFile`]. @@ -123,13 +128,21 @@ impl ManifestListWriter { ("sequence-number".to_string(), sequence_number.to_string()), ("format-version".to_string(), "2".to_string()), ]); - Self::new(FormatVersion::V2, output_file, metadata) + Self::new( + FormatVersion::V2, + output_file, + metadata, + sequence_number, + snapshot_id, + ) } fn new( format_version: FormatVersion, output_file: OutputFile, metadata: HashMap, + sequence_number: i64, + snapshot_id: i64, ) -> Self { let avro_schema = match format_version { FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1, @@ -145,6 +158,8 @@ impl ManifestListWriter { format_version, output_file, avro_writer, + sequence_number, + snapshot_id, } } @@ -161,7 +176,31 @@ impl ManifestListWriter { } } FormatVersion::V2 => { - for manifest_entry in manifest_entries { + for mut manifest_entry in manifest_entries { + if manifest_entry.sequence_number == UNASSIGNED_SEQUENCE_NUMBER { + if manifest_entry.added_snapshot_id != self.snapshot_id { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Found unassigned sequence number for a manifest from snapshot {}.", + manifest_entry.added_snapshot_id + ), + )); + } + manifest_entry.sequence_number = self.sequence_number; + } + if manifest_entry.min_sequence_number == UNASSIGNED_SEQUENCE_NUMBER { + if manifest_entry.added_snapshot_id != self.snapshot_id { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Found unassigned sequence number for a manifest from snapshot {}.", + manifest_entry.added_snapshot_id + ), + )); + } + manifest_entry.min_sequence_number = self.sequence_number; + } let manifest_entry: ManifestListEntryV2 = manifest_entry.try_into()?; self.avro_writer.append_ser(manifest_entry)?; } @@ -1000,9 +1039,9 @@ mod test { use crate::{ io::FileIOBuilder, spec::{ - manifest_list::_serde::ManifestListV1, FieldSummary, Literal, ManifestContentType, - ManifestList, ManifestListEntry, ManifestListWriter, NestedField, PrimitiveType, - StructType, Type, + manifest_list::{_serde::ManifestListV1, UNASSIGNED_SEQUENCE_NUMBER}, + FieldSummary, Literal, ManifestContentType, ManifestList, ManifestListEntry, + ManifestListWriter, NestedField, PrimitiveType, StructType, Type, }, }; @@ -1197,15 +1236,17 @@ mod test { #[tokio::test] async fn test_manifest_list_writer_v2() { - let expected_manifest_list = ManifestList { + let snapshot_id = 377075049360453639; + let seq_num = 1; + let mut expected_manifest_list = ManifestList { entries: vec![ManifestListEntry { manifest_path: "s3a://icebergdata/demo/s1/t1/metadata/05ffe08b-810f-49b3-a8f4-e88fc99b254a-m0.avro".to_string(), manifest_length: 6926, partition_spec_id: 1, content: ManifestContentType::Data, - sequence_number: 1, - min_sequence_number: 1, - added_snapshot_id: 377075049360453639, + sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + added_snapshot_id: snapshot_id, added_data_files_count: Some(1), existing_data_files_count: Some(0), deleted_data_files_count: Some(0), @@ -1222,7 +1263,7 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v2(output_file, 377075049360453639, 0, 1); + let mut writer = ManifestListWriter::v2(output_file, snapshot_id, 0, seq_num); writer .add_manifest_entries(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1239,6 +1280,8 @@ mod test { ))]), ) .unwrap(); + expected_manifest_list.entries[0].sequence_number = seq_num; + expected_manifest_list.entries[0].min_sequence_number = seq_num; assert_eq!(manifest_list, expected_manifest_list); temp_dir.close().unwrap(); From fc80ffa88d6d30d39a426201f0febf977233c2b3 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 13 Dec 2023 13:16:54 +0800 Subject: [PATCH 4/7] ignore typos testdata --- .typos.toml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .typos.toml diff --git a/.typos.toml b/.typos.toml new file mode 100644 index 000000000..df9fda0c4 --- /dev/null +++ b/.typos.toml @@ -0,0 +1,2 @@ +[files] +extend-exclude = ["crates/iceberg/testdata"] From 251ad7769af85891e240c327b493adc9c04cc4a2 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 13 Dec 2023 15:48:07 +0800 Subject: [PATCH 5/7] refine --- .typos.toml | 2 +- crates/iceberg/src/spec/manifest.rs | 243 ++++++++++++++------------- crates/iceberg/src/spec/partition.rs | 187 +++++++++++++++++++++ 3 files changed, 318 insertions(+), 114 deletions(-) diff --git a/.typos.toml b/.typos.toml index df9fda0c4..846d6999c 100644 --- a/.typos.toml +++ b/.typos.toml @@ -1,2 +1,2 @@ [files] -extend-exclude = ["crates/iceberg/testdata"] +extend-exclude = ["**/testdata"] diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 623f417a2..b469e8fdd 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -96,14 +96,14 @@ pub struct ManifestWriter { min_seq_num: Option, - key_metadata: Option>, + key_metadata: Vec, field_summary: HashMap, } impl ManifestWriter { /// Create a new manifest writer. - pub fn new(output: OutputFile, snapshot_id: i64, key_metadata: Option>) -> Self { + pub fn new(output: OutputFile, snapshot_id: i64, key_metadata: Vec) -> Self { Self { output, snapshot_id, @@ -121,47 +121,42 @@ impl ManifestWriter { fn update_field_summary(&mut self, entry: &ManifestEntry) { // Update field summary - if let Some(null) = &entry.data_file.null_value_counts { - for (&k, &v) in null { - let field_summary = self.field_summary.entry(k).or_default(); - if v > 0 { - field_summary.contains_null = true; - } + for (&k, &v) in &entry.data_file.null_value_counts { + let field_summary = self.field_summary.entry(k).or_default(); + if v > 0 { + field_summary.contains_null = true; } } - if let Some(nan) = &entry.data_file.nan_value_counts { - for (&k, &v) in nan { - let field_summary = self.field_summary.entry(k).or_default(); - if v > 0 { - field_summary.contains_nan = Some(true); - } - if v == 0 { - field_summary.contains_nan = Some(false); - } + + for (&k, &v) in &entry.data_file.nan_value_counts { + let field_summary = self.field_summary.entry(k).or_default(); + if v > 0 { + field_summary.contains_nan = Some(true); + } + if v == 0 { + field_summary.contains_nan = Some(false); } } - if let Some(lower_bound) = &entry.data_file.lower_bounds { - for (&k, v) in lower_bound { - let field_summary = self.field_summary.entry(k).or_default(); - if let Some(cur) = &field_summary.lower_bound { - if v < cur { - field_summary.lower_bound = Some(v.clone()); - } - } else { + + for (&k, v) in &entry.data_file.lower_bounds { + let field_summary = self.field_summary.entry(k).or_default(); + if let Some(cur) = &field_summary.lower_bound { + if v < cur { field_summary.lower_bound = Some(v.clone()); } + } else { + field_summary.lower_bound = Some(v.clone()); } } - if let Some(upper_bound) = &entry.data_file.upper_bounds { - for (&k, v) in upper_bound { - let field_summary = self.field_summary.entry(k).or_default(); - if let Some(cur) = &field_summary.upper_bound { - if v > cur { - field_summary.upper_bound = Some(v.clone()); - } - } else { + + for (&k, v) in &entry.data_file.upper_bounds { + let field_summary = self.field_summary.entry(k).or_default(); + if let Some(cur) = &field_summary.upper_bound { + if v > cur { field_summary.upper_bound = Some(v.clone()); } + } else { + field_summary.upper_bound = Some(v.clone()); } } } @@ -300,7 +295,7 @@ impl ManifestWriter { existing_rows_count: Some(self.existing_rows), deleted_rows_count: Some(self.deleted_rows), partitions: partition_summary, - key_metadata: self.key_metadata.unwrap_or_default(), + key_metadata: self.key_metadata, }) } } @@ -630,7 +625,7 @@ mod _const_schema { }) }; - pub fn manifest_schema_v2(partition_type: StructType) -> Result { + pub(super) fn manifest_schema_v2(partition_type: StructType) -> Result { let fields = vec![ STATUS.clone(), SNAPSHOT_ID_V2.clone(), @@ -668,7 +663,7 @@ mod _const_schema { schema_to_avro_schema("manifest", &schema) } - pub fn manifest_schema_v1(partition_type: StructType) -> Result { + pub(super) fn manifest_schema_v1(partition_type: StructType) -> Result { let fields = vec![ STATUS.clone(), SNAPSHOT_ID_V1.clone(), @@ -914,26 +909,26 @@ pub struct DataFile { /// Map from column id to the total size on disk of all regions that /// store the column. Does not include bytes necessary to read other /// columns, like footers. Leave null for row-oriented formats (Avro) - column_sizes: Option>, + column_sizes: HashMap, /// field id: 109 /// key field id: 119 /// value field id: 120 /// /// Map from column id to number of values in the column (including null /// and NaN values) - value_counts: Option>, + value_counts: HashMap, /// field id: 110 /// key field id: 121 /// value field id: 122 /// /// Map from column id to number of null values in the column - null_value_counts: Option>, + null_value_counts: HashMap, /// field id: 137 /// key field id: 138 /// value field id: 139 /// /// Map from column id to number of NaN values in the column - nan_value_counts: Option>, + nan_value_counts: HashMap, /// field id: 111 /// key field id: 123 /// value field id: 124 @@ -942,7 +937,7 @@ pub struct DataFile { /// distinct counts must be derived using values in the file by counting /// or using sketches, but not using methods like merging existing /// distinct counts - distinct_counts: Option>, + distinct_counts: HashMap, /// field id: 125 /// key field id: 126 /// value field id: 127 @@ -954,7 +949,7 @@ pub struct DataFile { /// Reference: /// /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) - lower_bounds: Option>, + lower_bounds: HashMap, /// field id: 128 /// key field id: 129 /// value field id: 130 @@ -966,17 +961,17 @@ pub struct DataFile { /// Reference: /// /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) - upper_bounds: Option>, + upper_bounds: HashMap, /// field id: 131 /// /// Implementation-specific key metadata for encryption - key_metadata: Option>, + key_metadata: Vec, /// field id: 132 /// element field id: 133 /// /// Split offsets for the data file. For example, all row group offsets /// in a Parquet file. Must be sorted ascending - split_offsets: Option>, + split_offsets: Vec, /// field id: 135 /// element field id: 136 /// @@ -984,7 +979,7 @@ pub struct DataFile { /// Required when content is EqualityDeletes and should be null /// otherwise. Fields with ids listed in this column must be present /// in the delete file - equality_ids: Option>, + equality_ids: Vec, /// field id: 140 /// /// ID representing sort order for this file. @@ -1192,16 +1187,16 @@ mod _serde { record_count: value.record_count.try_into()?, file_size_in_bytes: value.file_size_in_bytes.try_into()?, block_size_in_bytes, - column_sizes: value.column_sizes.map(to_i64_entry).transpose()?, - value_counts: value.value_counts.map(to_i64_entry).transpose()?, - null_value_counts: value.null_value_counts.map(to_i64_entry).transpose()?, - nan_value_counts: value.nan_value_counts.map(to_i64_entry).transpose()?, - distinct_counts: value.distinct_counts.map(to_i64_entry).transpose()?, - lower_bounds: value.lower_bounds.map(to_bytes_entry), - upper_bounds: value.upper_bounds.map(to_bytes_entry), - key_metadata: value.key_metadata.map(serde_bytes::ByteBuf::from), - split_offsets: value.split_offsets, - equality_ids: value.equality_ids, + column_sizes: Some(to_i64_entry(value.column_sizes)?), + value_counts: Some(to_i64_entry(value.value_counts)?), + null_value_counts: Some(to_i64_entry(value.null_value_counts)?), + nan_value_counts: Some(to_i64_entry(value.nan_value_counts)?), + distinct_counts: Some(to_i64_entry(value.distinct_counts)?), + lower_bounds: Some(to_bytes_entry(value.lower_bounds)), + upper_bounds: Some(to_bytes_entry(value.upper_bounds)), + key_metadata: Some(serde_bytes::ByteBuf::from(value.key_metadata)), + split_offsets: Some(value.split_offsets), + equality_ids: Some(value.equality_ids), sort_order_id: value.sort_order_id, }) } @@ -1232,22 +1227,44 @@ mod _serde { partition, record_count: self.record_count.try_into()?, file_size_in_bytes: self.file_size_in_bytes.try_into()?, - column_sizes: self.column_sizes.map(parse_i64_entry).transpose()?, - value_counts: self.value_counts.map(parse_i64_entry).transpose()?, - null_value_counts: self.null_value_counts.map(parse_i64_entry).transpose()?, - nan_value_counts: self.nan_value_counts.map(parse_i64_entry).transpose()?, - distinct_counts: self.distinct_counts.map(parse_i64_entry).transpose()?, + column_sizes: self + .column_sizes + .map(parse_i64_entry) + .transpose()? + .unwrap_or_default(), + value_counts: self + .value_counts + .map(parse_i64_entry) + .transpose()? + .unwrap_or_default(), + null_value_counts: self + .null_value_counts + .map(parse_i64_entry) + .transpose()? + .unwrap_or_default(), + nan_value_counts: self + .nan_value_counts + .map(parse_i64_entry) + .transpose()? + .unwrap_or_default(), + distinct_counts: self + .distinct_counts + .map(parse_i64_entry) + .transpose()? + .unwrap_or_default(), lower_bounds: self .lower_bounds .map(|v| parse_bytes_entry(v, schema)) - .transpose()?, + .transpose()? + .unwrap_or_default(), upper_bounds: self .upper_bounds .map(|v| parse_bytes_entry(v, schema)) - .transpose()?, - key_metadata: self.key_metadata.map(|v| v.to_vec()), - split_offsets: self.split_offsets, - equality_ids: self.equality_ids, + .transpose()? + .unwrap_or_default(), + key_metadata: self.key_metadata.map(|v| v.to_vec()).unwrap_or_default(), + split_offsets: self.split_offsets.unwrap_or_default(), + equality_ids: self.equality_ids.unwrap_or_default(), sort_order_id: self.sort_order_id, }) } @@ -1426,16 +1443,16 @@ mod tests { partition: Struct::empty(), record_count: 1, file_size_in_bytes: 5442, - column_sizes: Some(HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)])), - value_counts: Some(HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)])), - null_value_counts: Some(HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)])), - nan_value_counts: None, - distinct_counts: None, - lower_bounds: None, - upper_bounds: None, - key_metadata: None, - split_offsets: Some(vec![4]), - equality_ids: None, + column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]), + value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]), + null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]), + nan_value_counts: HashMap::new(), + distinct_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: Vec::new(), + split_offsets: vec![4], + equality_ids: Vec::new(), sort_order_id: None, } ); @@ -1558,7 +1575,7 @@ mod tests { assert_eq!(entry.data_file.file_size_in_bytes, 5442); assert_eq!( entry.data_file.column_sizes, - Some(HashMap::from([ + HashMap::from([ (0, 73), (6, 34), (2, 73), @@ -1570,11 +1587,11 @@ mod tests { (1, 61), (4, 73), (8, 73) - ])) + ]) ); assert_eq!( entry.data_file.value_counts, - Some(HashMap::from([ + HashMap::from([ (4, 1), (5, 1), (2, 1), @@ -1586,11 +1603,11 @@ mod tests { (10, 1), (7, 1), (9, 1) - ])) + ]) ); assert_eq!( entry.data_file.null_value_counts, - Some(HashMap::from([ + HashMap::from([ (1, 0), (6, 0), (2, 0), @@ -1602,15 +1619,15 @@ mod tests { (7, 0), (4, 0), (10, 0) - ])) + ]) ); - assert_eq!(entry.data_file.nan_value_counts, None); - assert_eq!(entry.data_file.distinct_counts, None); - assert_eq!(entry.data_file.lower_bounds, None); - assert_eq!(entry.data_file.upper_bounds, None); - assert_eq!(entry.data_file.key_metadata, None); - assert_eq!(entry.data_file.split_offsets, Some(vec![4])); - assert_eq!(entry.data_file.equality_ids, None); + assert!(entry.data_file.nan_value_counts.is_empty()); + assert!(entry.data_file.distinct_counts.is_empty()); + assert!(entry.data_file.lower_bounds.is_empty()); + assert!(entry.data_file.upper_bounds.is_empty()); + assert!(entry.data_file.key_metadata.is_empty()); + assert_eq!(entry.data_file.split_offsets, vec![4]); + assert!(entry.data_file.equality_ids.is_empty()); assert_eq!(entry.data_file.sort_order_id, None); } @@ -1666,16 +1683,16 @@ mod tests { partition: Struct::empty(), record_count: 1, file_size_in_bytes: 875, - column_sizes: Some(HashMap::from([(1,47),(2,48),(3,52)])), - value_counts: Some(HashMap::from([(1,1),(2,1),(3,1)])), - null_value_counts: Some(HashMap::from([(1,0),(2,0),(3,0)])), - nan_value_counts: Some(HashMap::new()), - distinct_counts: None, - lower_bounds: Some(HashMap::from([(1,Literal::int(1)),(2,Literal::string("a")),(3,Literal::string("AC/DC"))])), - upper_bounds: Some(HashMap::from([(1,Literal::int(1)),(2,Literal::string("a")),(3,Literal::string("AC/DC"))])), - key_metadata: None, - split_offsets: Some(vec![4]), - equality_ids: None, + column_sizes: HashMap::from([(1,47),(2,48),(3,52)]), + value_counts: HashMap::from([(1,1),(2,1),(3,1)]), + null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]), + nan_value_counts: HashMap::new(), + distinct_counts: HashMap::new(), + lower_bounds: HashMap::from([(1,Literal::int(1)),(2,Literal::string("a")),(3,Literal::string("AC/DC"))]), + upper_bounds: HashMap::from([(1,Literal::int(1)),(2,Literal::string("a")),(3,Literal::string("AC/DC"))]), + key_metadata: vec![], + split_offsets: vec![4], + equality_ids: vec![], sort_order_id: Some(0), } ); @@ -1754,37 +1771,37 @@ mod tests { assert_eq!(entry.data_file.file_size_in_bytes, 874); assert_eq!( entry.data_file.column_sizes, - Some(HashMap::from([(1, 46), (2, 48), (3, 48)])) + HashMap::from([(1, 46), (2, 48), (3, 48)]) ); assert_eq!( entry.data_file.value_counts, - Some(HashMap::from([(1, 1), (2, 1), (3, 1)])) + HashMap::from([(1, 1), (2, 1), (3, 1)]) ); assert_eq!( entry.data_file.null_value_counts, - Some(HashMap::from([(1, 0), (2, 0), (3, 0)])) + HashMap::from([(1, 0), (2, 0), (3, 0)]) ); - assert_eq!(entry.data_file.nan_value_counts, Some(HashMap::new())); - assert_eq!(entry.data_file.distinct_counts, None); + assert_eq!(entry.data_file.nan_value_counts, HashMap::new()); + assert_eq!(entry.data_file.distinct_counts, HashMap::new()); assert_eq!( entry.data_file.lower_bounds, - Some(HashMap::from([ + HashMap::from([ (1, Literal::long(1)), (2, Literal::string("a")), (3, Literal::string("x")) - ])) + ]) ); assert_eq!( entry.data_file.upper_bounds, - Some(HashMap::from([ + HashMap::from([ (1, Literal::long(1)), (2, Literal::string("a")), (3, Literal::string("x")) - ])) + ]) ); - assert_eq!(entry.data_file.key_metadata, None); - assert_eq!(entry.data_file.split_offsets, Some(vec![4])); - assert_eq!(entry.data_file.equality_ids, None); + assert!(entry.data_file.key_metadata.is_empty()); + assert_eq!(entry.data_file.split_offsets, vec![4]); + assert!(entry.data_file.equality_ids.is_empty()); assert_eq!(entry.data_file.sort_order_id, Some(0)); } @@ -1803,7 +1820,7 @@ mod tests { let path = temp_dir.path().join("manifest_list_v1.avro"); let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let writer = ManifestWriter::new(output_file, 1, None); + let writer = ManifestWriter::new(output_file, 1, vec![]); let entry = writer.write(manifest.clone()).await.unwrap(); // Check partition summary @@ -1833,7 +1850,7 @@ mod tests { let path = temp_dir.path().join("manifest_list_v2.avro"); let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let writer = ManifestWriter::new(output_file, 1, None); + let writer = ManifestWriter::new(output_file, 1, vec![]); let res = writer.write(manifest.clone()).await.unwrap(); assert_eq!(res.sequence_number, UNASSIGNED_SEQUENCE_NUMBER); assert_eq!(res.min_sequence_number, 1); diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index fcc7efae8..9388820a2 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -137,6 +137,8 @@ impl UnboundPartitionSpec { #[cfg(test)] mod tests { + use crate::spec::Type; + use super::*; #[test] @@ -302,4 +304,189 @@ mod tests { assert_eq!("ts_day", partition_spec.fields[0].name); assert_eq!(Transform::Day, partition_spec.fields[0].transform); } + + #[test] + fn test_partition_type() { + let spec = r#" + { + "spec-id": 1, + "fields": [ { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + }, { + "source-id": 1, + "field-id": 1001, + "name": "id_bucket", + "transform": "bucket[16]" + }, { + "source-id": 2, + "field-id": 1002, + "name": "id_truncate", + "transform": "truncate[4]" + } ] + } + "#; + + let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap(); + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + NestedField::required( + 3, + "ts", + Type::Primitive(crate::spec::PrimitiveType::Timestamp), + ) + .into(), + NestedField::required( + 4, + "ts_day", + Type::Primitive(crate::spec::PrimitiveType::Timestamp), + ) + .into(), + NestedField::required( + 5, + "id_bucket", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into(), + NestedField::required( + 6, + "id_truncate", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into(), + ]) + .build() + .unwrap(); + + let partition_type = partition_spec.partition_type(&schema).unwrap(); + assert_eq!(3, partition_type.fields().len()); + assert_eq!( + *partition_type.fields()[0], + NestedField::optional( + partition_spec.fields[0].field_id, + &partition_spec.fields[0].name, + Type::Primitive(crate::spec::PrimitiveType::Int) + ) + ); + assert_eq!( + *partition_type.fields()[1], + NestedField::optional( + partition_spec.fields[1].field_id, + &partition_spec.fields[1].name, + Type::Primitive(crate::spec::PrimitiveType::Int) + ) + ); + assert_eq!( + *partition_type.fields()[2], + NestedField::optional( + partition_spec.fields[2].field_id, + &partition_spec.fields[2].name, + Type::Primitive(crate::spec::PrimitiveType::String) + ) + ); + } + + #[test] + fn test_partition_empty() { + let spec = r#" + { + "spec-id": 1, + "fields": [] + } + "#; + + let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap(); + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + NestedField::required( + 3, + "ts", + Type::Primitive(crate::spec::PrimitiveType::Timestamp), + ) + .into(), + NestedField::required( + 4, + "ts_day", + Type::Primitive(crate::spec::PrimitiveType::Timestamp), + ) + .into(), + NestedField::required( + 5, + "id_bucket", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into(), + NestedField::required( + 6, + "id_truncate", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into(), + ]) + .build() + .unwrap(); + + let partition_type = partition_spec.partition_type(&schema).unwrap(); + assert_eq!(0, partition_type.fields().len()); + } + + #[test] + fn test_partition_error() { + let spec = r#" + { + "spec-id": 1, + "fields": [ { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + }, { + "source-id": 1, + "field-id": 1001, + "name": "id_bucket", + "transform": "bucket[16]" + }, { + "source-id": 2, + "field-id": 1002, + "name": "id_truncate", + "transform": "truncate[4]" + } ] + } + "#; + + let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap(); + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + assert!(partition_spec.partition_type(&schema).is_err()); + } } From 70d3b31998c821a15ad97259aa16c2fb3861261a Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 13 Dec 2023 16:09:24 +0800 Subject: [PATCH 6/7] add license for typos.toml --- .typos.toml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/.typos.toml b/.typos.toml index 846d6999c..93a9713e7 100644 --- a/.typos.toml +++ b/.typos.toml @@ -1,2 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + [files] extend-exclude = ["**/testdata"] From 6781ba94f245d14e75f602a08c2176cfcb171d7c Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 13 Dec 2023 17:32:40 +0800 Subject: [PATCH 7/7] remove distinct_counts --- crates/iceberg/src/spec/manifest.rs | 43 ----------------------------- 1 file changed, 43 deletions(-) diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index b469e8fdd..bc12edff7 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -510,27 +510,6 @@ mod _const_schema { }) }; - static DISTINCT_COUNTS: Lazy = { - Lazy::new(|| { - Arc::new(NestedField::optional( - 111, - "distinct_counts", - Type::Map(MapType { - key_field: Arc::new(NestedField::required( - 123, - "key", - Type::Primitive(PrimitiveType::Int), - )), - value_field: Arc::new(NestedField::required( - 124, - "value", - Type::Primitive(PrimitiveType::Long), - )), - }), - )) - }) - }; - static LOWER_BOUNDS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( @@ -649,7 +628,6 @@ mod _const_schema { VALUE_COUNTS.clone(), NULL_VALUE_COUNTS.clone(), NAN_VALUE_COUNTS.clone(), - DISTINCT_COUNTS.clone(), LOWER_BOUNDS.clone(), UPPER_BOUNDS.clone(), KEY_METADATA.clone(), @@ -685,7 +663,6 @@ mod _const_schema { VALUE_COUNTS.clone(), NULL_VALUE_COUNTS.clone(), NAN_VALUE_COUNTS.clone(), - DISTINCT_COUNTS.clone(), LOWER_BOUNDS.clone(), UPPER_BOUNDS.clone(), KEY_METADATA.clone(), @@ -929,15 +906,6 @@ pub struct DataFile { /// /// Map from column id to number of NaN values in the column nan_value_counts: HashMap, - /// field id: 111 - /// key field id: 123 - /// value field id: 124 - /// - /// Map from column id to number of distinct values in the column; - /// distinct counts must be derived using values in the file by counting - /// or using sketches, but not using methods like merging existing - /// distinct counts - distinct_counts: HashMap, /// field id: 125 /// key field id: 126 /// value field id: 127 @@ -1159,7 +1127,6 @@ mod _serde { value_counts: Option>, null_value_counts: Option>, nan_value_counts: Option>, - distinct_counts: Option>, lower_bounds: Option>, upper_bounds: Option>, key_metadata: Option, @@ -1191,7 +1158,6 @@ mod _serde { value_counts: Some(to_i64_entry(value.value_counts)?), null_value_counts: Some(to_i64_entry(value.null_value_counts)?), nan_value_counts: Some(to_i64_entry(value.nan_value_counts)?), - distinct_counts: Some(to_i64_entry(value.distinct_counts)?), lower_bounds: Some(to_bytes_entry(value.lower_bounds)), upper_bounds: Some(to_bytes_entry(value.upper_bounds)), key_metadata: Some(serde_bytes::ByteBuf::from(value.key_metadata)), @@ -1247,11 +1213,6 @@ mod _serde { .map(parse_i64_entry) .transpose()? .unwrap_or_default(), - distinct_counts: self - .distinct_counts - .map(parse_i64_entry) - .transpose()? - .unwrap_or_default(), lower_bounds: self .lower_bounds .map(|v| parse_bytes_entry(v, schema)) @@ -1447,7 +1408,6 @@ mod tests { value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]), null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]), nan_value_counts: HashMap::new(), - distinct_counts: HashMap::new(), lower_bounds: HashMap::new(), upper_bounds: HashMap::new(), key_metadata: Vec::new(), @@ -1622,7 +1582,6 @@ mod tests { ]) ); assert!(entry.data_file.nan_value_counts.is_empty()); - assert!(entry.data_file.distinct_counts.is_empty()); assert!(entry.data_file.lower_bounds.is_empty()); assert!(entry.data_file.upper_bounds.is_empty()); assert!(entry.data_file.key_metadata.is_empty()); @@ -1687,7 +1646,6 @@ mod tests { value_counts: HashMap::from([(1,1),(2,1),(3,1)]), null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]), nan_value_counts: HashMap::new(), - distinct_counts: HashMap::new(), lower_bounds: HashMap::from([(1,Literal::int(1)),(2,Literal::string("a")),(3,Literal::string("AC/DC"))]), upper_bounds: HashMap::from([(1,Literal::int(1)),(2,Literal::string("a")),(3,Literal::string("AC/DC"))]), key_metadata: vec![], @@ -1782,7 +1740,6 @@ mod tests { HashMap::from([(1, 0), (2, 0), (3, 0)]) ); assert_eq!(entry.data_file.nan_value_counts, HashMap::new()); - assert_eq!(entry.data_file.distinct_counts, HashMap::new()); assert_eq!( entry.data_file.lower_bounds, HashMap::from([