From ff82174bbbd12d675894aba116256624947caaff Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 12 Dec 2023 14:48:44 +0800 Subject: [PATCH] refine in memory struct --- crates/iceberg/src/spec/manifest.rs | 160 +++++++++--------- crates/iceberg/src/spec/manifest_list.rs | 198 ++++++++++++++--------- 2 files changed, 207 insertions(+), 151 deletions(-) diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 2a5d58728..5540d32de 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -21,7 +21,7 @@ use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; use super::Literal; use super::{ FieldSummary, FormatVersion, ManifestContentType, ManifestListEntry, PartitionSpec, Schema, - Struct, UNASSIGNED_SEQ_NUMBER, + Struct, }; use crate::io::OutputFile; use crate::spec::PartitionField; @@ -87,12 +87,12 @@ pub struct ManifestWriter { snapshot_id: i64, - added_files: i32, - added_rows: i64, - existing_files: i32, - existing_rows: i64, - deleted_files: i32, - deleted_rows: i64, + added_files: u32, + added_rows: u64, + existing_files: u32, + existing_rows: u64, + deleted_files: u32, + deleted_rows: u64, seq_num: i64, min_seq_num: Option, @@ -139,7 +139,6 @@ impl ManifestWriter { if let Some(nan) = &entry.data_file.nan_value_counts { for (&k, &v) in nan { let field_summary = self.field_summary.entry(k).or_default(); - assert!(v >= 0); if v > 0 { field_summary.contains_nan = Some(true); } @@ -187,7 +186,7 @@ impl ManifestWriter { } /// Write a manifest entry. - pub async fn write(mut self, manifest: Manifest) -> Result { + pub async fn write(mut self, manifest: Manifest) -> Result, Error> { // Create the avro writer let partition_type = manifest .metadata @@ -289,23 +288,28 @@ impl ManifestWriter { let partition_summary = self.get_field_summary_vec(&manifest.metadata.partition_spec.fields); - Ok(ManifestListEntry { - manifest_path: self.output.location().to_string(), - manifest_length: length as i64, - partition_spec_id: manifest.metadata.partition_spec.spec_id, - content: manifest.metadata.content, - sequence_number: self.seq_num, - min_sequence_number: self.min_seq_num.unwrap_or(UNASSIGNED_SEQ_NUMBER), - added_snapshot_id: self.snapshot_id, - added_data_files_count: Some(self.added_files), - existing_data_files_count: Some(self.existing_files), - deleted_data_files_count: Some(self.deleted_files), - added_rows_count: Some(self.added_rows), - existing_rows_count: Some(self.existing_rows), - deleted_rows_count: Some(self.deleted_rows), - partitions: partition_summary, - key_metadata: self.key_metadata.unwrap_or_default(), - }) + if let Some(min_sequence_number) = self.min_seq_num { + Ok(Some(ManifestListEntry { + manifest_path: self.output.location().to_string(), + manifest_length: length as i64, + partition_spec_id: manifest.metadata.partition_spec.spec_id, + content: manifest.metadata.content, + sequence_number: self.seq_num, + min_sequence_number, + added_snapshot_id: self.snapshot_id, + added_data_files_count: Some(self.added_files), + existing_data_files_count: Some(self.existing_files), + deleted_data_files_count: Some(self.deleted_files), + added_rows_count: Some(self.added_rows), + existing_rows_count: Some(self.existing_rows), + deleted_rows_count: Some(self.deleted_rows), + partitions: partition_summary, + key_metadata: self.key_metadata.unwrap_or_default(), + })) + } else { + // All entries are deleted + Ok(None) + } } } @@ -324,7 +328,7 @@ mod _const_schema { Error, }; - pub static STATUS: Lazy = { + static STATUS: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 0, @@ -334,7 +338,7 @@ mod _const_schema { }) }; - pub static SNAPSHOT_ID_V1: Lazy = { + static SNAPSHOT_ID_V1: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 1, @@ -344,7 +348,7 @@ mod _const_schema { }) }; - pub static SNAPSHOT_ID_V2: Lazy = { + static SNAPSHOT_ID_V2: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 1, @@ -354,7 +358,7 @@ mod _const_schema { }) }; - pub static SEQUENCE_NUMBER: Lazy = { + static SEQUENCE_NUMBER: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 3, @@ -364,7 +368,7 @@ mod _const_schema { }) }; - pub static FILE_SEQUENCE_NUMBER: Lazy = { + static FILE_SEQUENCE_NUMBER: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 4, @@ -374,7 +378,7 @@ mod _const_schema { }) }; - pub static CONTENT: Lazy = { + static CONTENT: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 134, @@ -384,7 +388,7 @@ mod _const_schema { }) }; - pub static FILE_PATH: Lazy = { + static FILE_PATH: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 100, @@ -394,7 +398,7 @@ mod _const_schema { }) }; - pub static FILE_FORMAT: Lazy = { + static FILE_FORMAT: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 101, @@ -404,7 +408,7 @@ mod _const_schema { }) }; - pub static RECORD_COUNT: Lazy = { + static RECORD_COUNT: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 103, @@ -414,7 +418,7 @@ mod _const_schema { }) }; - pub static FILE_SIZE_IN_BYTES: Lazy = { + static FILE_SIZE_IN_BYTES: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 104, @@ -425,7 +429,7 @@ mod _const_schema { }; // Deprecated. Always write a default in v1. Do not write in v2. - pub static BLOCK_SIZE_IN_BYTES: Lazy = { + static BLOCK_SIZE_IN_BYTES: Lazy = { Lazy::new(|| { Arc::new(NestedField::required( 105, @@ -435,7 +439,7 @@ mod _const_schema { }) }; - pub static COLUMN_SIZES: Lazy = { + static COLUMN_SIZES: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 108, @@ -456,7 +460,7 @@ mod _const_schema { }) }; - pub static VALUE_COUNTS: Lazy = { + static VALUE_COUNTS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 109, @@ -477,7 +481,7 @@ mod _const_schema { }) }; - pub static NULL_VALUE_COUNTS: Lazy = { + static NULL_VALUE_COUNTS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 110, @@ -498,7 +502,7 @@ mod _const_schema { }) }; - pub static NAN_VALUE_COUNTS: Lazy = { + static NAN_VALUE_COUNTS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 137, @@ -519,7 +523,7 @@ mod _const_schema { }) }; - pub static DISTINCT_COUNTS: Lazy = { + static DISTINCT_COUNTS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 111, @@ -540,7 +544,7 @@ mod _const_schema { }) }; - pub static LOWER_BOUNDS: Lazy = { + static LOWER_BOUNDS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 125, @@ -561,7 +565,7 @@ mod _const_schema { }) }; - pub static UPPER_BOUNDS: Lazy = { + static UPPER_BOUNDS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 128, @@ -582,7 +586,7 @@ mod _const_schema { }) }; - pub static KEY_METADATA: Lazy = { + static KEY_METADATA: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 131, @@ -592,7 +596,7 @@ mod _const_schema { }) }; - pub static SPLIT_OFFSETS: Lazy = { + static SPLIT_OFFSETS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 132, @@ -608,7 +612,7 @@ mod _const_schema { }) }; - pub static EQUALITY_IDS: Lazy = { + static EQUALITY_IDS: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 135, @@ -624,7 +628,7 @@ mod _const_schema { }) }; - pub static SORT_ORDER_ID: Lazy = { + static SORT_ORDER_ID: Lazy = { Lazy::new(|| { Arc::new(NestedField::optional( 140, @@ -906,11 +910,11 @@ pub struct DataFile { /// field id: 103 /// /// Number of records in this file - record_count: i64, + record_count: u64, /// field id: 104 /// /// Total file size in bytes - file_size_in_bytes: i64, + file_size_in_bytes: u64, /// field id: 108 /// key field id: 117 /// value field id: 118 @@ -918,26 +922,26 @@ pub struct DataFile { /// Map from column id to the total size on disk of all regions that /// store the column. Does not include bytes necessary to read other /// columns, like footers. Leave null for row-oriented formats (Avro) - column_sizes: Option>, + column_sizes: Option>, /// field id: 109 /// key field id: 119 /// value field id: 120 /// /// Map from column id to number of values in the column (including null /// and NaN values) - value_counts: Option>, + value_counts: Option>, /// field id: 110 /// key field id: 121 /// value field id: 122 /// /// Map from column id to number of null values in the column - null_value_counts: Option>, + null_value_counts: Option>, /// field id: 137 /// key field id: 138 /// value field id: 139 /// /// Map from column id to number of NaN values in the column - nan_value_counts: Option>, + nan_value_counts: Option>, /// field id: 111 /// key field id: 123 /// value field id: 124 @@ -946,7 +950,7 @@ pub struct DataFile { /// distinct counts must be derived using values in the file by counting /// or using sketches, but not using methods like merging existing /// distinct counts - distinct_counts: Option>, + distinct_counts: Option>, /// field id: 125 /// key field id: 126 /// value field id: 127 @@ -1193,14 +1197,14 @@ mod _serde { Literal::Struct(value.partition), &Type::Struct(partition_type.clone()), )?, - record_count: value.record_count, - file_size_in_bytes: value.file_size_in_bytes, + record_count: value.record_count.try_into()?, + file_size_in_bytes: value.file_size_in_bytes.try_into()?, block_size_in_bytes, - column_sizes: value.column_sizes.map(to_i64_entry), - value_counts: value.value_counts.map(to_i64_entry), - null_value_counts: value.null_value_counts.map(to_i64_entry), - nan_value_counts: value.nan_value_counts.map(to_i64_entry), - distinct_counts: value.distinct_counts.map(to_i64_entry), + column_sizes: value.column_sizes.map(to_i64_entry).transpose()?, + value_counts: value.value_counts.map(to_i64_entry).transpose()?, + null_value_counts: value.null_value_counts.map(to_i64_entry).transpose()?, + nan_value_counts: value.nan_value_counts.map(to_i64_entry).transpose()?, + distinct_counts: value.distinct_counts.map(to_i64_entry).transpose()?, lower_bounds: value.lower_bounds.map(to_bytes_entry), upper_bounds: value.upper_bounds.map(to_bytes_entry), key_metadata: value.key_metadata.map(serde_bytes::ByteBuf::from), @@ -1234,13 +1238,13 @@ mod _serde { file_path: self.file_path, file_format: self.file_format.parse()?, partition, - record_count: self.record_count, - file_size_in_bytes: self.file_size_in_bytes, - column_sizes: self.column_sizes.map(parse_i64_entry), - value_counts: self.value_counts.map(parse_i64_entry), - null_value_counts: self.null_value_counts.map(parse_i64_entry), - nan_value_counts: self.nan_value_counts.map(parse_i64_entry), - distinct_counts: self.distinct_counts.map(parse_i64_entry), + record_count: self.record_count.try_into()?, + file_size_in_bytes: self.file_size_in_bytes.try_into()?, + column_sizes: self.column_sizes.map(parse_i64_entry).transpose()?, + value_counts: self.value_counts.map(parse_i64_entry).transpose()?, + null_value_counts: self.null_value_counts.map(parse_i64_entry).transpose()?, + nan_value_counts: self.nan_value_counts.map(parse_i64_entry).transpose()?, + distinct_counts: self.distinct_counts.map(parse_i64_entry).transpose()?, lower_bounds: self .lower_bounds .map(|v| parse_bytes_entry(v, schema)) @@ -1301,20 +1305,22 @@ mod _serde { value: i64, } - fn parse_i64_entry(v: Vec) -> HashMap { + fn parse_i64_entry(v: Vec) -> Result, Error> { let mut m = HashMap::with_capacity(v.len()); for entry in v { - m.insert(entry.key, entry.value); + m.insert(entry.key, entry.value.try_into()?); } - m + Ok(m) } - fn to_i64_entry(entries: HashMap) -> Vec { + fn to_i64_entry(entries: HashMap) -> Result, Error> { entries .iter() - .map(|e| I64Entry { - key: *e.0, - value: *e.1, + .map(|e| { + Ok(I64Entry { + key: *e.0, + value: (*e.1).try_into()?, + }) }) .collect() } @@ -1806,7 +1812,7 @@ mod tests { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); let writer = ManifestWriter::new(output_file, 1, 1, None); - let entry = writer.write(manifest.clone()).await.unwrap(); + let entry = writer.write(manifest.clone()).await.unwrap().unwrap(); // Check partition summary assert_eq!(entry.partitions.len(), 1); diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 7790ac223..e9d5b80f7 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -30,9 +30,6 @@ use self::{ use super::{FormatVersion, StructType}; -/// The seq number when no added files are present. -pub const UNASSIGNED_SEQ_NUMBER: i64 = -1; - /// Snapshots are embedded in table metadata, but the list of manifests for a /// snapshot are stored in a separate manifest list file. /// @@ -159,7 +156,7 @@ impl ManifestListWriter { match self.format_version { FormatVersion::V1 => { for manifest_entry in manifest_entries { - let manifest_entry: ManifestListEntryV1 = manifest_entry.into(); + let manifest_entry: ManifestListEntryV1 = manifest_entry.try_into()?; self.avro_writer.append_ser(manifest_entry)?; } } @@ -502,32 +499,32 @@ pub struct ManifestListEntry { /// /// Number of entries in the manifest that have status ADDED, when null /// this is assumed to be non-zero - pub added_data_files_count: Option, + pub added_data_files_count: Option, /// field: 505 /// /// Number of entries in the manifest that have status EXISTING (0), /// when null this is assumed to be non-zero - pub existing_data_files_count: Option, + pub existing_data_files_count: Option, /// field: 506 /// /// Number of entries in the manifest that have status DELETED (2), /// when null this is assumed to be non-zero - pub deleted_data_files_count: Option, + pub deleted_data_files_count: Option, /// field: 512 /// /// Number of rows in all of files in the manifest that have status /// ADDED, when null this is assumed to be non-zero - pub added_rows_count: Option, + pub added_rows_count: Option, /// field: 513 /// /// Number of rows in all of files in the manifest that have status /// EXISTING, when null this is assumed to be non-zero - pub existing_rows_count: Option, + pub existing_rows_count: Option, /// field: 514 /// /// Number of rows in all of files in the manifest that have status /// DELETED, when null this is assumed to be non-zero - pub deleted_rows_count: Option, + pub deleted_rows_count: Option, /// field: 507 /// element_field: 508 /// @@ -685,11 +682,17 @@ pub(super) mod _serde { } } - impl From for ManifestListV1 { - fn from(value: super::ManifestList) -> Self { - Self { - entries: value.entries.into_iter().map(Into::into).collect(), - } + impl TryFrom for ManifestListV1 { + type Error = Error; + + fn try_from(value: super::ManifestList) -> Result { + Ok(Self { + entries: value + .entries + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?, + }) } } @@ -796,12 +799,12 @@ pub(super) mod _serde { sequence_number: self.sequence_number, min_sequence_number: self.min_sequence_number, added_snapshot_id: self.added_snapshot_id, - added_data_files_count: Some(self.added_data_files_count), - existing_data_files_count: Some(self.existing_data_files_count), - deleted_data_files_count: Some(self.deleted_data_files_count), - added_rows_count: Some(self.added_rows_count), - existing_rows_count: Some(self.existing_rows_count), - deleted_rows_count: Some(self.deleted_rows_count), + added_data_files_count: Some(self.added_data_files_count.try_into()?), + existing_data_files_count: Some(self.existing_data_files_count.try_into()?), + deleted_data_files_count: Some(self.deleted_data_files_count.try_into()?), + added_rows_count: Some(self.added_rows_count.try_into()?), + existing_rows_count: Some(self.existing_rows_count.try_into()?), + deleted_rows_count: Some(self.deleted_rows_count.try_into()?), partitions, key_metadata: self.key_metadata.map(|b| b.into_vec()).unwrap_or_default(), }) @@ -818,12 +821,24 @@ pub(super) mod _serde { manifest_length: self.manifest_length, partition_spec_id: self.partition_spec_id, added_snapshot_id: self.added_snapshot_id, - added_data_files_count: self.added_data_files_count, - existing_data_files_count: self.existing_data_files_count, - deleted_data_files_count: self.deleted_data_files_count, - added_rows_count: self.added_rows_count, - existing_rows_count: self.existing_rows_count, - deleted_rows_count: self.deleted_rows_count, + added_data_files_count: self + .added_data_files_count + .map(TryInto::try_into) + .transpose()?, + existing_data_files_count: self + .existing_data_files_count + .map(TryInto::try_into) + .transpose()?, + deleted_data_files_count: self + .deleted_data_files_count + .map(TryInto::try_into) + .transpose()?, + added_rows_count: self.added_rows_count.map(TryInto::try_into).transpose()?, + existing_rows_count: self + .existing_rows_count + .map(TryInto::try_into) + .transpose()?, + deleted_rows_count: self.deleted_rows_count.map(TryInto::try_into).transpose()?, partitions, key_metadata: self.key_metadata.map(|b| b.into_vec()).unwrap_or_default(), // as ref: https://iceberg.apache.org/spec/#partitioning @@ -877,66 +892,101 @@ pub(super) mod _serde { sequence_number: value.sequence_number, min_sequence_number: value.min_sequence_number, added_snapshot_id: value.added_snapshot_id, - added_data_files_count: value.added_data_files_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "added_data_files_count in ManifestListEntryV2 should be require", - ) - })?, - existing_data_files_count: value.existing_data_files_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "existing_data_files_count in ManifestListEntryV2 should be require", - ) - })?, - deleted_data_files_count: value.deleted_data_files_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "deleted_data_files_count in ManifestListEntryV2 should be require", - ) - })?, - added_rows_count: value.added_rows_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "added_rows_count in ManifestListEntryV2 should be require", - ) - })?, - existing_rows_count: value.existing_rows_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "existing_rows_count in ManifestListEntryV2 should be require", - ) - })?, - deleted_rows_count: value.deleted_rows_count.ok_or_else(|| { - Error::new( - crate::ErrorKind::DataInvalid, - "deleted_rows_count in ManifestListEntryV2 should be require", - ) - })?, + added_data_files_count: value + .added_data_files_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "added_data_files_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + existing_data_files_count: value + .existing_data_files_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "existing_data_files_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + deleted_data_files_count: value + .deleted_data_files_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "deleted_data_files_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + added_rows_count: value + .added_rows_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "added_rows_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + existing_rows_count: value + .existing_rows_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "existing_rows_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, + deleted_rows_count: value + .deleted_rows_count + .ok_or_else(|| { + Error::new( + crate::ErrorKind::DataInvalid, + "deleted_rows_count in ManifestListEntryV2 should be require", + ) + })? + .try_into()?, partitions, key_metadata, }) } } - impl From for ManifestListEntryV1 { - fn from(value: ManifestListEntry) -> Self { + impl TryFrom for ManifestListEntryV1 { + type Error = Error; + + fn try_from(value: ManifestListEntry) -> Result { let partitions = convert_to_serde_field_summary(value.partitions); let key_metadata = convert_to_serde_key_metadata(value.key_metadata); - Self { + Ok(Self { manifest_path: value.manifest_path, manifest_length: value.manifest_length, partition_spec_id: value.partition_spec_id, added_snapshot_id: value.added_snapshot_id, - added_data_files_count: value.added_data_files_count, - existing_data_files_count: value.existing_data_files_count, - deleted_data_files_count: value.deleted_data_files_count, - added_rows_count: value.added_rows_count, - existing_rows_count: value.existing_rows_count, - deleted_rows_count: value.deleted_rows_count, + added_data_files_count: value + .added_data_files_count + .map(TryInto::try_into) + .transpose()?, + existing_data_files_count: value + .existing_data_files_count + .map(TryInto::try_into) + .transpose()?, + deleted_data_files_count: value + .deleted_data_files_count + .map(TryInto::try_into) + .transpose()?, + added_rows_count: value.added_rows_count.map(TryInto::try_into).transpose()?, + existing_rows_count: value + .existing_rows_count + .map(TryInto::try_into) + .transpose()?, + deleted_rows_count: value + .deleted_rows_count + .map(TryInto::try_into) + .transpose()?, partitions, key_metadata, - } + }) } } } @@ -1060,7 +1110,7 @@ mod test { partitions: vec![], key_metadata: vec![], }] - }.into(); + }.try_into().unwrap(); let result = serde_json::to_string(&manifest_list).unwrap(); assert_eq!( result,