From d1decdbb67042d2cf61d0c0b0f5aeae6923d110f Mon Sep 17 00:00:00 2001 From: ZENOTME <43447882+ZENOTME@users.noreply.github.com> Date: Sat, 14 Dec 2024 15:51:25 +0800 Subject: [PATCH 1/6] fix: wrong compute of partitions in manifest (#794) * fix partitions of manifest * refine code --------- Co-authored-by: ZENOTME --- crates/iceberg/src/spec/manifest.rs | 332 +++++++++++++++++++---- crates/iceberg/src/spec/manifest_list.rs | 12 +- 2 files changed, 285 insertions(+), 59 deletions(-) diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 13ecdc2e6..a4b4d7cdb 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter}; use bytes::Bytes; +use itertools::Itertools; use serde_derive::{Deserialize, Serialize}; use serde_json::to_vec; use serde_with::{DeserializeFromStr, SerializeDisplay}; @@ -31,7 +32,8 @@ use typed_builder::TypedBuilder; use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; use super::{ BoundPartitionSpec, Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile, - Schema, SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER, + PrimitiveLiteral, PrimitiveType, Schema, SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, + UNASSIGNED_SEQUENCE_NUMBER, }; use crate::error::Result; use crate::io::OutputFile; @@ -128,7 +130,69 @@ pub struct ManifestWriter { key_metadata: Vec, - field_summary: HashMap, + partitions: Vec, +} + +struct PartitionFieldStats { + partition_type: PrimitiveType, + summary: FieldSummary, +} + +impl PartitionFieldStats { + pub(crate) fn new(partition_type: PrimitiveType) -> Self { + Self { + partition_type, + summary: FieldSummary::default(), + } + } + + pub(crate) fn update(&mut self, value: Option) -> Result<()> { + let Some(value) = value else { + self.summary.contains_null = true; + return Ok(()); + }; + if !self.partition_type.compatible(&value) { + return Err(Error::new( + ErrorKind::DataInvalid, + "value is not compatitable with type", + )); + } + let value = Datum::new(self.partition_type.clone(), value); + + if value.is_nan() { + self.summary.contains_nan = Some(true); + return Ok(()); + } + + self.summary.lower_bound = Some(self.summary.lower_bound.take().map_or( + value.clone(), + |original| { + if value < original { + value.clone() + } else { + original + } + }, + )); + self.summary.upper_bound = Some(self.summary.upper_bound.take().map_or( + value.clone(), + |original| { + if value > original { + value + } else { + original + } + }, + )); + + Ok(()) + } + + pub(crate) fn finish(mut self) -> FieldSummary { + // Always set contains_nan + self.summary.contains_nan = self.summary.contains_nan.or(Some(false)); + self.summary + } } impl ManifestWriter { @@ -145,62 +209,28 @@ impl ManifestWriter { deleted_rows: 0, min_seq_num: None, key_metadata, - field_summary: HashMap::new(), + partitions: vec![], } } - fn update_field_summary(&mut self, entry: &ManifestEntry) { - // Update field summary - for (&k, &v) in &entry.data_file.null_value_counts { - let field_summary = self.field_summary.entry(k).or_default(); - if v > 0 { - field_summary.contains_null = true; - } - } - - for (&k, &v) in &entry.data_file.nan_value_counts { - let field_summary = self.field_summary.entry(k).or_default(); - if v > 0 { - field_summary.contains_nan = Some(true); - } - if v == 0 { - field_summary.contains_nan = Some(false); - } - } - - for (&k, v) in &entry.data_file.lower_bounds { - let field_summary = self.field_summary.entry(k).or_default(); - if let Some(cur) = &field_summary.lower_bound { - if v < cur { - field_summary.lower_bound = Some(v.clone()); - } - } else { - field_summary.lower_bound = Some(v.clone()); - } - } - - for (&k, v) in &entry.data_file.upper_bounds { - let field_summary = self.field_summary.entry(k).or_default(); - if let Some(cur) = &field_summary.upper_bound { - if v > cur { - field_summary.upper_bound = Some(v.clone()); - } - } else { - field_summary.upper_bound = Some(v.clone()); + fn construct_partition_summaries( + &mut self, + partition_spec: &BoundPartitionSpec, + ) -> Result> { + let partitions = std::mem::take(&mut self.partitions); + let mut field_stats: Vec<_> = partition_spec + .partition_type() + .fields() + .iter() + .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone())) + .collect(); + for partition in partitions { + for (literal, stat) in partition.into_iter().zip_eq(field_stats.iter_mut()) { + let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap()); + stat.update(primitive_literal)?; } } - } - - fn get_field_summary_vec(&mut self, spec_fields: &[PartitionField]) -> Vec { - let mut partition_summary = Vec::with_capacity(self.field_summary.len()); - for field in spec_fields { - let entry = self - .field_summary - .remove(&field.source_id) - .unwrap_or_default(); - partition_summary.push(entry); - } - partition_summary + Ok(field_stats.into_iter().map(|stat| stat.finish()).collect()) } /// Write a manifest. @@ -276,7 +306,7 @@ impl ManifestWriter { } } - self.update_field_summary(&entry); + self.partitions.push(entry.data_file.partition.clone()); let value = match manifest.metadata.format_version { FormatVersion::V1 => to_value(_serde::ManifestEntryV1::try_from( @@ -299,7 +329,7 @@ impl ManifestWriter { self.output.write(Bytes::from(content)).await?; let partition_summary = - self.get_field_summary_vec(manifest.metadata.partition_spec.fields()); + self.construct_partition_summaries(&manifest.metadata.partition_spec)?; Ok(ManifestFile { manifest_path: self.output.location().to_string(), @@ -2086,6 +2116,198 @@ mod tests { assert_eq!(actual_manifest, expected_manifest); } + #[tokio::test] + async fn test_manifest_summary() { + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "time", + Type::Primitive(PrimitiveType::Date), + )), + Arc::new(NestedField::optional( + 2, + "v_float", + Type::Primitive(PrimitiveType::Float), + )), + Arc::new(NestedField::optional( + 3, + "v_double", + Type::Primitive(PrimitiveType::Double), + )), + ]) + .build() + .unwrap(), + ); + let partition_spec = BoundPartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .add_partition_field("time", "year_of_time", Transform::Year) + .unwrap() + .add_partition_field("v_float", "f", Transform::Identity) + .unwrap() + .add_partition_field("v_double", "d", Transform::Identity) + .unwrap() + .build() + .unwrap(); + let manifest = Manifest { + metadata: ManifestMetadata { + schema_id: 0, + schema, + partition_spec, + content: ManifestContentType::Data, + format_version: FormatVersion::V2, + }, + entries: vec![ + Arc::new(ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: None, + sequence_number: None, + file_sequence_number: None, + data_file: DataFile { + content: DataContentType::Data, + file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::from_iter( + vec![ + Some(Literal::int(2021)), + Some(Literal::float(1.0)), + Some(Literal::double(2.0)), + ] + ), + record_count: 1, + file_size_in_bytes: 5442, + column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]), + value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]), + null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: Vec::new(), + split_offsets: vec![4], + equality_ids: Vec::new(), + sort_order_id: None, + } + }), + Arc::new( + ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: None, + sequence_number: None, + file_sequence_number: None, + data_file: DataFile { + content: DataContentType::Data, + file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::from_iter( + vec![ + Some(Literal::int(1111)), + Some(Literal::float(15.5)), + Some(Literal::double(25.5)), + ] + ), + record_count: 1, + file_size_in_bytes: 5442, + column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]), + value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]), + null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: Vec::new(), + split_offsets: vec![4], + equality_ids: Vec::new(), + sort_order_id: None, + } + } + ), + Arc::new( + ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: None, + sequence_number: None, + file_sequence_number: None, + data_file: DataFile { + content: DataContentType::Data, + file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::from_iter( + vec![ + Some(Literal::int(1211)), + Some(Literal::float(f32::NAN)), + Some(Literal::double(1.0)), + ] + ), + record_count: 1, + file_size_in_bytes: 5442, + column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]), + value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]), + null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: Vec::new(), + split_offsets: vec![4], + equality_ids: Vec::new(), + sort_order_id: None, + } + } + ), + Arc::new( + ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: None, + sequence_number: None, + file_sequence_number: None, + data_file: DataFile { + content: DataContentType::Data, + file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::from_iter( + vec![ + Some(Literal::int(1111)), + None, + Some(Literal::double(11.0)), + ] + ), + record_count: 1, + file_size_in_bytes: 5442, + column_sizes: HashMap::from([(0,73),(6,34),(2,73),(7,61),(3,61),(5,62),(9,79),(10,73),(1,61),(4,73),(8,73)]), + value_counts: HashMap::from([(4,1),(5,1),(2,1),(0,1),(3,1),(6,1),(8,1),(1,1),(10,1),(7,1),(9,1)]), + null_value_counts: HashMap::from([(1,0),(6,0),(2,0),(8,0),(0,0),(3,0),(5,0),(9,0),(7,0),(4,0),(10,0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: Vec::new(), + split_offsets: vec![4], + equality_ids: Vec::new(), + sort_order_id: None, + } + } + ), + ] + }; + + let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + + let res = test_manifest_read_write(manifest, writer).await; + assert!(res.partitions.len() == 3); + assert!(res.partitions[0].lower_bound == Some(Datum::int(1111))); + assert!(res.partitions[0].upper_bound == Some(Datum::int(2021))); + assert!(!res.partitions[0].contains_null); + assert!(res.partitions[0].contains_nan == Some(false)); + + assert!(res.partitions[1].lower_bound == Some(Datum::float(1.0))); + assert!(res.partitions[1].upper_bound == Some(Datum::float(15.5))); + assert!(res.partitions[1].contains_null); + assert!(res.partitions[1].contains_nan == Some(true)); + + assert!(res.partitions[2].lower_bound == Some(Datum::double(1.0))); + assert!(res.partitions[2].upper_bound == Some(Datum::double(25.5))); + assert!(!res.partitions[2].contains_null); + assert!(res.partitions[2].contains_nan == Some(false)); + } + async fn test_manifest_read_write( manifest: Manifest, writer_builder: impl FnOnce(OutputFile) -> ManifestWriter, diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 6c898f89d..97d259ad3 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -857,12 +857,16 @@ pub(super) mod _serde { contains_nan: self.contains_nan, lower_bound: self .lower_bound - .map(|v| Datum::try_from_bytes(&v, r#type.clone())) - .transpose()?, + .as_ref() + .map(|v| Datum::try_from_bytes(v, r#type.clone())) + .transpose() + .map_err(|err| err.with_context("type", format!("{:?}", r#type)))?, upper_bound: self .upper_bound - .map(|v| Datum::try_from_bytes(&v, r#type.clone())) - .transpose()?, + .as_ref() + .map(|v| Datum::try_from_bytes(v, r#type.clone())) + .transpose() + .map_err(|err| err.with_context("type", format!("{:?}", r#type)))?, }) } } From 0ba444fc06241123f78eabca4965aa72fbeb469e Mon Sep 17 00:00:00 2001 From: feniljain <49019259+feniljain@users.noreply.github.com> Date: Sat, 14 Dec 2024 15:57:17 +0530 Subject: [PATCH 2/6] fix: set key_metadata to Null by default (#800) * fix: set key_metadata to Null by default * fix: return Option<&[u8]> instead of &Option> for key_metadata * test: use `None` instead of `Some` for key_metadata fields * refactor: use as_deref instead of explicit ref/deref using map --- .../src/expr/visitors/expression_evaluator.rs | 4 +-- .../visitors/inclusive_metrics_evaluator.rs | 12 +++---- crates/iceberg/src/io/object_cache.rs | 1 + crates/iceberg/src/scan.rs | 1 + crates/iceberg/src/spec/manifest.rs | 31 ++++++++++--------- .../src/writer/file_writer/parquet_writer.rs | 2 +- 6 files changed, 27 insertions(+), 24 deletions(-) diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs b/crates/iceberg/src/expr/visitors/expression_evaluator.rs index 2add5761f..d451401c1 100644 --- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs @@ -338,7 +338,7 @@ mod tests { nan_value_counts: HashMap::new(), lower_bounds: HashMap::new(), upper_bounds: HashMap::new(), - key_metadata: vec![], + key_metadata: None, split_offsets: vec![], equality_ids: vec![], sort_order_id: None, @@ -361,7 +361,7 @@ mod tests { nan_value_counts: HashMap::new(), lower_bounds: HashMap::new(), upper_bounds: HashMap::new(), - key_metadata: vec![], + key_metadata: None, split_offsets: vec![], equality_ids: vec![], sort_order_id: None, diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs index 1cdc75771..7b04fae3a 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs @@ -1991,7 +1991,7 @@ mod test { nan_value_counts: Default::default(), lower_bounds: Default::default(), upper_bounds: Default::default(), - key_metadata: vec![], + key_metadata: None, split_offsets: vec![], equality_ids: vec![], sort_order_id: None, @@ -2012,7 +2012,7 @@ mod test { nan_value_counts: Default::default(), lower_bounds: Default::default(), upper_bounds: Default::default(), - key_metadata: vec![], + key_metadata: None, split_offsets: vec![], equality_ids: vec![], sort_order_id: None, @@ -2069,7 +2069,7 @@ mod test { ]), column_sizes: Default::default(), - key_metadata: vec![], + key_metadata: None, split_offsets: vec![], equality_ids: vec![], sort_order_id: None, @@ -2095,7 +2095,7 @@ mod test { upper_bounds: HashMap::from([(3, Datum::string("dC"))]), column_sizes: Default::default(), - key_metadata: vec![], + key_metadata: None, split_offsets: vec![], equality_ids: vec![], sort_order_id: None, @@ -2122,7 +2122,7 @@ mod test { upper_bounds: HashMap::from([(3, Datum::string("3str3"))]), column_sizes: Default::default(), - key_metadata: vec![], + key_metadata: None, split_offsets: vec![], equality_ids: vec![], sort_order_id: None, @@ -2149,7 +2149,7 @@ mod test { upper_bounds: HashMap::from([(3, Datum::string("イロハニホヘト"))]), column_sizes: Default::default(), - key_metadata: vec![], + key_metadata: None, split_offsets: vec![], equality_ids: vec![], sort_order_id: None, diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 88e2d0e2d..809db33f5 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -278,6 +278,7 @@ mod tests { .file_size_in_bytes(100) .record_count(1) .partition(Struct::from_iter([Some(Literal::long(100))])) + .key_metadata(None) .build() .unwrap(), ) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 8f0bc38f6..89cc21bbf 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -1073,6 +1073,7 @@ mod tests { .file_size_in_bytes(100) .record_count(1) .partition(Struct::from_iter([Some(Literal::long(100))])) + .key_metadata(None) .build() .unwrap(), ) diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index a4b4d7cdb..60f5469cd 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -1074,7 +1074,7 @@ pub struct DataFile { /// /// Implementation-specific key metadata for encryption #[builder(default)] - pub(crate) key_metadata: Vec, + pub(crate) key_metadata: Option>, /// field id: 132 /// element field id: 133 /// @@ -1164,8 +1164,8 @@ impl DataFile { &self.upper_bounds } /// Get the Implementation-specific key metadata for the data file. - pub fn key_metadata(&self) -> &[u8] { - &self.key_metadata + pub fn key_metadata(&self) -> Option<&[u8]> { + self.key_metadata.as_deref() } /// Get the split offsets of the data file. /// For example, all row group offsets in a Parquet file. @@ -1378,12 +1378,13 @@ mod _serde { nan_value_counts: Some(to_i64_entry(value.nan_value_counts)?), lower_bounds: Some(to_bytes_entry(value.lower_bounds)?), upper_bounds: Some(to_bytes_entry(value.upper_bounds)?), - key_metadata: Some(serde_bytes::ByteBuf::from(value.key_metadata)), + key_metadata: value.key_metadata.map(serde_bytes::ByteBuf::from), split_offsets: Some(value.split_offsets), equality_ids: Some(value.equality_ids), sort_order_id: value.sort_order_id, }) } + pub fn try_into( self, partition_type: &StructType, @@ -1441,7 +1442,7 @@ mod _serde { .map(|v| parse_bytes_entry(v, schema)) .transpose()? .unwrap_or_default(), - key_metadata: self.key_metadata.map(|v| v.to_vec()).unwrap_or_default(), + key_metadata: self.key_metadata.map(|v| v.to_vec()), split_offsets: self.split_offsets.unwrap_or_default(), equality_ids: self.equality_ids.unwrap_or_default(), sort_order_id: self.sort_order_id, @@ -1657,7 +1658,7 @@ mod tests { nan_value_counts: HashMap::new(), lower_bounds: HashMap::new(), upper_bounds: HashMap::new(), - key_metadata: Vec::new(), + key_metadata: None, split_offsets: vec![4], equality_ids: Vec::new(), sort_order_id: None, @@ -1813,7 +1814,7 @@ mod tests { nan_value_counts: HashMap::new(), lower_bounds: HashMap::new(), upper_bounds: HashMap::new(), - key_metadata: vec![], + key_metadata: None, split_offsets: vec![4], equality_ids: vec![], sort_order_id: None, @@ -1880,7 +1881,7 @@ mod tests { nan_value_counts: HashMap::new(), lower_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]), upper_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]), - key_metadata: vec![], + key_metadata: None, split_offsets: vec![4], equality_ids: vec![], sort_order_id: Some(0), @@ -1960,7 +1961,7 @@ mod tests { (2, Datum::string("a")), (3, Datum::string("x")) ]), - key_metadata: vec![], + key_metadata: None, split_offsets: vec![4], equality_ids: vec![], sort_order_id: Some(0), @@ -2035,7 +2036,7 @@ mod tests { (2, Datum::int(2)), (3, Datum::string("x")) ]), - key_metadata: vec![], + key_metadata: None, split_offsets: vec![4], equality_ids: vec![], sort_order_id: None, @@ -2105,7 +2106,7 @@ mod tests { (1, Datum::long(1)), (2, Datum::int(2)), ]), - key_metadata: vec![], + key_metadata: None, split_offsets: vec![4], equality_ids: vec![], sort_order_id: None, @@ -2183,7 +2184,7 @@ mod tests { nan_value_counts: HashMap::new(), lower_bounds: HashMap::new(), upper_bounds: HashMap::new(), - key_metadata: Vec::new(), + key_metadata: None, split_offsets: vec![4], equality_ids: Vec::new(), sort_order_id: None, @@ -2214,7 +2215,7 @@ mod tests { nan_value_counts: HashMap::new(), lower_bounds: HashMap::new(), upper_bounds: HashMap::new(), - key_metadata: Vec::new(), + key_metadata: None, split_offsets: vec![4], equality_ids: Vec::new(), sort_order_id: None, @@ -2246,7 +2247,7 @@ mod tests { nan_value_counts: HashMap::new(), lower_bounds: HashMap::new(), upper_bounds: HashMap::new(), - key_metadata: Vec::new(), + key_metadata: None, split_offsets: vec![4], equality_ids: Vec::new(), sort_order_id: None, @@ -2278,7 +2279,7 @@ mod tests { nan_value_counts: HashMap::new(), lower_bounds: HashMap::new(), upper_bounds: HashMap::new(), - key_metadata: Vec::new(), + key_metadata: None, split_offsets: vec![4], equality_ids: Vec::new(), sort_order_id: None, diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 09f9a7057..596228f7c 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -381,7 +381,7 @@ impl ParquetWriter { // # TODO(#417) // - nan_value_counts // - distinct_counts - .key_metadata(metadata.footer_signing_key_metadata.unwrap_or_default()) + .key_metadata(metadata.footer_signing_key_metadata) .split_offsets( metadata .row_groups From 7981def302774ad01ae65fd649a02f1686653b83 Mon Sep 17 00:00:00 2001 From: feniljain <49019259+feniljain@users.noreply.github.com> Date: Sat, 14 Dec 2024 17:01:22 +0530 Subject: [PATCH 3/6] test: append partition data file (#742) * test: append partition data file * chore: fix `compatible` spell mistake --- crates/iceberg/src/transaction.rs | 6 +- .../tests/append_partition_data_file_test.rs | 251 ++++++++++++++++++ 2 files changed, 254 insertions(+), 3 deletions(-) create mode 100644 crates/integration_tests/tests/append_partition_data_file_test.rs diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index edf1a8596..f58536a2c 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -317,7 +317,7 @@ impl<'a> SnapshotProduceAction<'a> { if partition_value.fields().len() != partition_type.fields().len() { return Err(Error::new( ErrorKind::DataInvalid, - "Partition value is not compatitable with partition type", + "Partition value is not compatible with partition type", )); } for (value, field) in partition_value.fields().iter().zip(partition_type.fields()) { @@ -334,7 +334,7 @@ impl<'a> SnapshotProduceAction<'a> { { return Err(Error::new( ErrorKind::DataInvalid, - "Partition value is not compatitable partition type", + "Partition value is not compatible partition type", )); } } @@ -784,7 +784,7 @@ mod tests { let tx = Transaction::new(&table); let mut action = tx.fast_append(None, vec![]).unwrap(); - // check add data file with uncompatitable partition value + // check add data file with incompatible partition value let data_file = DataFileBuilder::default() .content(DataContentType::Data) .file_path("test/3.parquet".to_string()) diff --git a/crates/integration_tests/tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/append_partition_data_file_test.rs new file mode 100644 index 000000000..103021532 --- /dev/null +++ b/crates/integration_tests/tests/append_partition_data_file_test.rs @@ -0,0 +1,251 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration test for partition data file + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use futures::TryStreamExt; +use iceberg::spec::{ + Literal, NestedField, PrimitiveLiteral, PrimitiveType, Schema, Struct, Transform, Type, + UnboundPartitionSpec, +}; +use iceberg::table::Table; +use iceberg::transaction::Transaction; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation}; +use iceberg_integration_tests::set_test_fixture; +use parquet::file::properties::WriterProperties; + +#[tokio::test] +async fn test_append_partition_data_file() { + let fixture = set_test_fixture("test_partition_data_file").await; + + let ns = Namespace::with_properties( + NamespaceIdent::from_strs(["iceberg", "rust"]).unwrap(), + HashMap::from([ + ("owner".to_string(), "ray".to_string()), + ("community".to_string(), "apache".to_string()), + ]), + ); + + fixture + .rest_catalog + .create_namespace(ns.name(), ns.properties().clone()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + let unbound_partition_spec = UnboundPartitionSpec::builder() + .add_partition_field(2, "id", Transform::Identity) + .expect("could not add partition field") + .build(); + + let partition_spec = unbound_partition_spec + .bind(schema.clone()) + .expect("could not bind to schema"); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .partition_spec(partition_spec) + .build(); + + let table = fixture + .rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Create the writer and write the data + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + + let first_partition_id_value = 100; + + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + + let mut data_file_writer_valid = DataFileWriterBuilder::new( + parquet_writer_builder.clone(), + Some(Struct::from_iter([Some(Literal::Primitive( + PrimitiveLiteral::Int(first_partition_id_value), + ))])), + ) + .build() + .await + .unwrap(); + + let col1 = StringArray::from(vec![Some("foo1"), Some("foo2")]); + let col2 = Int32Array::from(vec![ + Some(first_partition_id_value), + Some(first_partition_id_value), + ]); + let col3 = BooleanArray::from(vec![Some(true), Some(false)]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + + data_file_writer_valid.write(batch.clone()).await.unwrap(); + let data_file_valid = data_file_writer_valid.close().await.unwrap(); + + // commit result + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action + .add_data_files(data_file_valid.clone()) + .unwrap(); + let tx = append_action.apply().await.unwrap(); + let table = tx.commit(&fixture.rest_catalog).await.unwrap(); + + // check result + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0], batch); + + test_schema_incompatible_partition_type( + parquet_writer_builder.clone(), + batch.clone(), + table.clone(), + ) + .await; + + test_schema_incompatible_partition_fields( + parquet_writer_builder, + batch, + table, + first_partition_id_value, + ) + .await; +} + +async fn test_schema_incompatible_partition_type( + parquet_writer_builder: ParquetWriterBuilder< + DefaultLocationGenerator, + DefaultFileNameGenerator, + >, + batch: RecordBatch, + table: Table, +) { + // test writing different "type" of partition than mentioned in schema + let mut data_file_writer_invalid = DataFileWriterBuilder::new( + parquet_writer_builder.clone(), + Some(Struct::from_iter([Some(Literal::Primitive( + PrimitiveLiteral::Boolean(true), + ))])), + ) + .build() + .await + .unwrap(); + + data_file_writer_invalid.write(batch.clone()).await.unwrap(); + let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); + + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + if append_action + .add_data_files(data_file_invalid.clone()) + .is_ok() + { + panic!("diverging partition info should have returned error"); + } +} + +async fn test_schema_incompatible_partition_fields( + parquet_writer_builder: ParquetWriterBuilder< + DefaultLocationGenerator, + DefaultFileNameGenerator, + >, + batch: RecordBatch, + table: Table, + first_partition_id_value: i32, +) { + // test writing different number of partition fields than mentioned in schema + + let mut data_file_writer_invalid = DataFileWriterBuilder::new( + parquet_writer_builder, + Some(Struct::from_iter([ + Some(Literal::Primitive(PrimitiveLiteral::Int( + first_partition_id_value, + ))), + Some(Literal::Primitive(PrimitiveLiteral::Int( + first_partition_id_value, + ))), + ])), + ) + .build() + .await + .unwrap(); + + data_file_writer_invalid.write(batch.clone()).await.unwrap(); + let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); + + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + if append_action + .add_data_files(data_file_invalid.clone()) + .is_ok() + { + panic!("passing different number of partition fields should have returned error"); + } +} From b9f1849e6719101c4e4da17b970cdbe7a0cd9412 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 14 Dec 2024 20:02:32 +0800 Subject: [PATCH 4/6] chore: Add more debug message inside error (#793) Signed-off-by: Xuanwo --- crates/catalog/rest/src/client.rs | 58 ++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/crates/catalog/rest/src/client.rs b/crates/catalog/rest/src/client.rs index 53dcd4cee..7027edef8 100644 --- a/crates/catalog/rest/src/client.rs +++ b/crates/catalog/rest/src/client.rs @@ -134,28 +134,39 @@ impl HttpClient { .request(Method::POST, &self.token_endpoint) .form(¶ms) .build()?; + let auth_url = auth_req.url().clone(); let auth_resp = self.client.execute(auth_req).await?; let auth_res: TokenResponse = if auth_resp.status().as_u16() == OK { - let text = auth_resp.bytes().await?; + let text = auth_resp + .bytes() + .await + .map_err(|err| err.with_url(auth_url.clone()))?; Ok(serde_json::from_slice(&text).map_err(|e| { Error::new( ErrorKind::Unexpected, "Failed to parse response from rest catalog server!", ) + .with_context("operation", "auth") + .with_context("url", auth_url.to_string()) .with_context("json", String::from_utf8_lossy(&text)) .with_source(e) })?) } else { let code = auth_resp.status(); - let text = auth_resp.bytes().await?; + let text = auth_resp + .bytes() + .await + .map_err(|err| err.with_url(auth_url.clone()))?; let e: ErrorResponse = serde_json::from_slice(&text).map_err(|e| { Error::new( ErrorKind::Unexpected, "Failed to parse response from rest catalog server!", ) - .with_context("json", String::from_utf8_lossy(&text)) .with_context("code", code.to_string()) + .with_context("operation", "auth") + .with_context("url", auth_url.to_string()) + .with_context("json", String::from_utf8_lossy(&text)) .with_source(e) })?; Err(Error::from(e)) @@ -193,28 +204,41 @@ impl HttpClient { ) -> Result { self.authenticate(&mut request).await?; + let method = request.method().clone(); + let url = request.url().clone(); + let resp = self.client.execute(request).await?; if resp.status().as_u16() == SUCCESS_CODE { - let text = resp.bytes().await?; + let text = resp + .bytes() + .await + .map_err(|err| err.with_url(url.clone()))?; Ok(serde_json::from_slice::(&text).map_err(|e| { Error::new( ErrorKind::Unexpected, "Failed to parse response from rest catalog server!", ) + .with_context("method", method.to_string()) + .with_context("url", url.to_string()) .with_context("json", String::from_utf8_lossy(&text)) .with_source(e) })?) } else { let code = resp.status(); - let text = resp.bytes().await?; + let text = resp + .bytes() + .await + .map_err(|err| err.with_url(url.clone()))?; let e = serde_json::from_slice::(&text).map_err(|e| { Error::new( ErrorKind::Unexpected, "Failed to parse response from rest catalog server!", ) - .with_context("json", String::from_utf8_lossy(&text)) .with_context("code", code.to_string()) + .with_context("method", method.to_string()) + .with_context("url", url.to_string()) + .with_context("json", String::from_utf8_lossy(&text)) .with_source(e) })?; Err(e.into()) @@ -227,20 +251,28 @@ impl HttpClient { ) -> Result<()> { self.authenticate(&mut request).await?; + let method = request.method().clone(); + let url = request.url().clone(); + let resp = self.client.execute(request).await?; if resp.status().as_u16() == SUCCESS_CODE { Ok(()) } else { let code = resp.status(); - let text = resp.bytes().await?; + let text = resp + .bytes() + .await + .map_err(|err| err.with_url(url.clone()))?; let e = serde_json::from_slice::(&text).map_err(|e| { Error::new( ErrorKind::Unexpected, "Failed to parse response from rest catalog server!", ) - .with_context("json", String::from_utf8_lossy(&text)) .with_context("code", code.to_string()) + .with_context("method", method.to_string()) + .with_context("url", url.to_string()) + .with_context("json", String::from_utf8_lossy(&text)) .with_source(e) })?; Err(e.into()) @@ -255,19 +287,27 @@ impl HttpClient { ) -> Result { self.authenticate(&mut request).await?; + let method = request.method().clone(); + let url = request.url().clone(); + let resp = self.client.execute(request).await?; if let Some(ret) = handler(&resp) { Ok(ret) } else { let code = resp.status(); - let text = resp.bytes().await?; + let text = resp + .bytes() + .await + .map_err(|err| err.with_url(url.clone()))?; let e = serde_json::from_slice::(&text).map_err(|e| { Error::new( ErrorKind::Unexpected, "Failed to parse response from rest catalog server!", ) .with_context("code", code.to_string()) + .with_context("method", method.to_string()) + .with_context("url", url.to_string()) .with_context("json", String::from_utf8_lossy(&text)) .with_source(e) })?; From 748d37c8d552d3d001fe884699307f1522602961 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 14 Dec 2024 20:03:27 +0800 Subject: [PATCH 5/6] fix: Error source from cache has been shadowed (#792) * feat: Print debug source error instead Signed-off-by: Xuanwo * Don't hide internal source error Signed-off-by: Xuanwo * revert error fmt changes, not related Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- crates/iceberg/src/io/object_cache.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 809db33f5..6ea7594ba 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -141,7 +141,15 @@ impl ObjectCache { .entry_by_ref(&key) .or_try_insert_with(self.fetch_and_parse_manifest_list(snapshot, table_metadata)) .await - .map_err(|err| Error::new(ErrorKind::Unexpected, err.as_ref().message()))? + .map_err(|err| { + Arc::try_unwrap(err).unwrap_or_else(|err| { + Error::new( + ErrorKind::Unexpected, + "Failed to load manifest list in cache", + ) + .with_source(err) + }) + })? .into_value(); match cache_entry { From 54926a2ded1b9747ffcded2ced5ce64d4f900c56 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 14 Dec 2024 20:25:55 +0800 Subject: [PATCH 6/6] fix(catalog/rest): Ensure token been reused correctly (#801) * fix(catalog/rest): Ensure token been reused correctly Signed-off-by: Xuanwo * Fix oauth test Signed-off-by: Xuanwo * Fix tests Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- crates/catalog/rest/src/catalog.rs | 17 ++++++++++------- crates/catalog/rest/src/client.rs | 27 +++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index fce5fe2be..96da5dc95 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -256,9 +256,10 @@ impl RestCatalog { async fn context(&self) -> Result<&RestContext> { self.ctx .get_or_try_init(|| async { - let catalog_config = RestCatalog::load_config(&self.user_config).await?; + let client = HttpClient::new(&self.user_config)?; + let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?; let config = self.user_config.clone().merge_with_config(catalog_config); - let client = HttpClient::new(&config)?; + let client = client.update_with(&config)?; Ok(RestContext { config, client }) }) @@ -268,9 +269,10 @@ impl RestCatalog { /// Load the runtime config from the server by user_config. /// /// It's required for a rest catalog to update it's config after creation. - async fn load_config(user_config: &RestCatalogConfig) -> Result { - let client = HttpClient::new(user_config)?; - + async fn load_config( + client: &HttpClient, + user_config: &RestCatalogConfig, + ) -> Result { let mut request = client.request(Method::GET, user_config.config_endpoint()); if let Some(warehouse_location) = &user_config.warehouse { @@ -280,6 +282,7 @@ impl RestCatalog { let config = client .query::(request.build()?) .await?; + Ok(config) } @@ -777,7 +780,7 @@ mod tests { "expires_in": 86400 }"#, ) - .expect(2) + .expect(1) .create_async() .await } @@ -831,7 +834,7 @@ mod tests { "expires_in": 86400 }"#, ) - .expect(2) + .expect(1) .create_async() .await; diff --git a/crates/catalog/rest/src/client.rs b/crates/catalog/rest/src/client.rs index 7027edef8..e06090134 100644 --- a/crates/catalog/rest/src/client.rs +++ b/crates/catalog/rest/src/client.rs @@ -54,6 +54,7 @@ impl Debug for HttpClient { } impl HttpClient { + /// Create a new http client. pub fn new(cfg: &RestCatalogConfig) -> Result { Ok(HttpClient { client: Client::new(), @@ -66,6 +67,32 @@ impl HttpClient { }) } + /// Update the http client with new configuration. + /// + /// If cfg carries new value, we will use cfg instead. + /// Otherwise, we will keep the old value. + pub fn update_with(self, cfg: &RestCatalogConfig) -> Result { + Ok(HttpClient { + client: self.client, + + token: Mutex::new( + cfg.token() + .or_else(|| self.token.into_inner().ok().flatten()), + ), + token_endpoint: (!cfg.get_token_endpoint().is_empty()) + .then(|| cfg.get_token_endpoint()) + .unwrap_or(self.token_endpoint), + credential: cfg.credential().or(self.credential), + extra_headers: (!cfg.extra_headers()?.is_empty()) + .then(|| cfg.extra_headers()) + .transpose()? + .unwrap_or(self.extra_headers), + extra_oauth_params: (!cfg.extra_oauth_params().is_empty()) + .then(|| cfg.extra_oauth_params()) + .unwrap_or(self.extra_oauth_params), + }) + } + /// This API is testing only to assert the token. #[cfg(test)] pub(crate) async fn token(&self) -> Option {