From 3335586d07e764f001fc912eb4f3918db62a814d Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Tue, 23 Apr 2024 20:05:49 +0100 Subject: [PATCH] feat: add InclusiveMetricsEvaluator --- .../visitors/inclusive_metrics_evaluator.rs | 657 ++++++++++++++++++ .../src/expr/visitors/manifest_evaluator.rs | 2 - crates/iceberg/src/expr/visitors/mod.rs | 1 + crates/iceberg/src/scan.rs | 46 +- crates/iceberg/src/spec/values.rs | 22 + 5 files changed, 718 insertions(+), 10 deletions(-) create mode 100644 crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs new file mode 100644 index 000000000..2def7b906 --- /dev/null +++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs @@ -0,0 +1,657 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; +use crate::expr::{BoundPredicate, BoundReference}; +use crate::spec::{DataFile, Datum, Literal, PrimitiveLiteral}; +use crate::{Error, ErrorKind}; +use fnv::FnvHashSet; + +const IN_PREDICATE_LIMIT: usize = 200; +const ROWS_MIGHT_MATCH: crate::Result = Ok(true); +const ROWS_CANNOT_MATCH: crate::Result = Ok(false); + +pub(crate) struct InclusiveMetricsEvaluator { + include_empty_files: bool, + filter: BoundPredicate, +} + +impl InclusiveMetricsEvaluator { + pub(crate) fn new(filter: BoundPredicate, include_empty_files: bool) -> crate::Result { + Ok(InclusiveMetricsEvaluator { + include_empty_files, + filter, + }) + } + + /// Evaluate this `InclusiveMetricsEvaluator`'s filter predicate against the + /// provided [`DataFile`]'s partitions. Used by [`TableScan`] to + /// see if this `DataFile` contains data that matches + /// the scan's filter. + pub(crate) fn eval(&self, data_file: &DataFile) -> crate::Result { + if !self.include_empty_files && data_file.record_count == 0 { + return ROWS_CANNOT_MATCH; + } + + let mut evaluator = DataFileFilterVisitor::new(self, data_file); + + visit(&mut evaluator, &self.filter) + } +} + +struct DataFileFilterVisitor<'a> { + inclusive_metrics_evaluator: &'a InclusiveMetricsEvaluator, + data_file: &'a DataFile, +} + +// Remove this annotation once all todos have been removed +#[allow(unused_variables)] +impl BoundPredicateVisitor for DataFileFilterVisitor<'_> { + type T = bool; + + fn always_true(&mut self) -> crate::Result { + ROWS_MIGHT_MATCH + } + + fn always_false(&mut self) -> crate::Result { + ROWS_CANNOT_MATCH + } + + fn and(&mut self, lhs: bool, rhs: bool) -> crate::Result { + Ok(lhs && rhs) + } + + fn or(&mut self, lhs: bool, rhs: bool) -> crate::Result { + Ok(lhs || rhs) + } + + fn not(&mut self, inner: bool) -> crate::Result { + Ok(!inner) + } + + fn is_null( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> crate::Result { + let field_id = reference.field().id; + + if let Some(&null_count) = self.null_count(field_id) { + if null_count == 0 { + ROWS_CANNOT_MATCH + } else { + ROWS_MIGHT_MATCH + } + } else { + ROWS_MIGHT_MATCH + } + } + + fn not_null( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> crate::Result { + let field_id = reference.field().id; + + if self.contains_nulls_only(field_id) { + ROWS_CANNOT_MATCH + } else { + ROWS_MIGHT_MATCH + } + } + + fn is_nan( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> crate::Result { + let field_id = reference.field().id; + + if let Some(&nan_count) = self.nan_count(field_id) { + if nan_count == 0 { + return ROWS_CANNOT_MATCH; + } + } + + if self.contains_nulls_only(field_id) { + ROWS_CANNOT_MATCH + } else { + ROWS_MIGHT_MATCH + } + } + + fn not_nan( + &mut self, + reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> crate::Result { + let field_id = reference.field().id; + + if self.contains_nans_only(field_id) { + ROWS_CANNOT_MATCH + } else { + ROWS_MIGHT_MATCH + } + } + + fn less_than( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> crate::Result { + let field_id = reference.field().id; + + if self.contains_nulls_only(field_id) || self.contains_nans_only(field_id) { + return ROWS_CANNOT_MATCH; + } + + if let Some(lower_bound) = self.lower_bound(field_id) { + let Literal::Primitive(lower_bound) = lower_bound else { + return Err(Error::new( + ErrorKind::Unexpected, + "Eq Predicate can only compare against a Primitive Literal", + )); + }; + if datum.is_nan() { + // NaN indicates unreliable bounds. + // See the InclusiveMetricsEvaluator docs for more. + ROWS_MIGHT_MATCH + } else { + Ok(datum.literal().lt(lower_bound)) + } + } else { + ROWS_MIGHT_MATCH + } + } + + fn less_than_or_eq( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> crate::Result { + let field_id = reference.field().id; + + if self.contains_nulls_only(field_id) || self.contains_nans_only(field_id) { + return ROWS_CANNOT_MATCH; + } + + if let Some(lower_bound) = self.lower_bound(field_id) { + let Literal::Primitive(lower_bound) = lower_bound else { + return Err(Error::new( + ErrorKind::Unexpected, + "Eq Predicate can only compare against a Primitive Literal", + )); + }; + if datum.is_nan() { + // NaN indicates unreliable bounds. + // See the InclusiveMetricsEvaluator docs for more. + ROWS_MIGHT_MATCH + } else { + Ok(datum.literal().le(lower_bound)) + } + } else { + ROWS_MIGHT_MATCH + } + } + + fn greater_than( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> crate::Result { + let field_id = reference.field().id; + + if self.contains_nulls_only(field_id) || self.contains_nans_only(field_id) { + return ROWS_CANNOT_MATCH; + } + + if let Some(upper_bound) = self.upper_bound(field_id) { + let Literal::Primitive(upper_bound) = upper_bound else { + return Err(Error::new( + ErrorKind::Unexpected, + "Eq Predicate can only compare against a Primitive Literal", + )); + }; + if datum.is_nan() { + // NaN indicates unreliable bounds. + // See the InclusiveMetricsEvaluator docs for more. + ROWS_MIGHT_MATCH + } else { + Ok(datum.literal().gt(upper_bound)) + } + } else { + ROWS_MIGHT_MATCH + } + } + + fn greater_than_or_eq( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> crate::Result { + let field_id = reference.field().id; + + if self.contains_nulls_only(field_id) || self.contains_nans_only(field_id) { + return ROWS_CANNOT_MATCH; + } + + if let Some(lower_bound) = self.lower_bound(field_id) { + let Literal::Primitive(lower_bound) = lower_bound else { + return Err(Error::new( + ErrorKind::Unexpected, + "Eq Predicate can only compare against a Primitive Literal", + )); + }; + if datum.is_nan() { + // NaN indicates unreliable bounds. + // See the InclusiveMetricsEvaluator docs for more. + ROWS_MIGHT_MATCH + } else { + Ok(datum.literal().ge(lower_bound)) + } + } else { + ROWS_MIGHT_MATCH + } + } + + fn eq( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> crate::Result { + let field_id = reference.field().id; + + if self.contains_nulls_only(field_id) || self.contains_nans_only(field_id) { + return ROWS_CANNOT_MATCH; + } + + if let Some(lower_bound) = self.lower_bound(field_id) { + let Literal::Primitive(lower_bound) = lower_bound else { + return Err(Error::new( + ErrorKind::Unexpected, + "Eq Predicate can only compare against a Primitive Literal", + )); + }; + if lower_bound.is_nan() { + // NaN indicates unreliable bounds. + // See the InclusiveMetricsEvaluator docs for more. + return ROWS_MIGHT_MATCH; + } else if lower_bound.gt(datum.literal()) { + return ROWS_CANNOT_MATCH; + } + } + + if let Some(upper_bound) = self.upper_bound(field_id) { + let Literal::Primitive(upper_bound) = upper_bound else { + return Err(Error::new( + ErrorKind::Unexpected, + "Eq Predicate can only compare against a Primitive Literal", + )); + }; + if upper_bound.is_nan() { + // NaN indicates unreliable bounds. + // See the InclusiveMetricsEvaluator docs for more. + return ROWS_MIGHT_MATCH; + } else if upper_bound.lt(datum.literal()) { + return ROWS_CANNOT_MATCH; + } + } + + ROWS_MIGHT_MATCH + } + + fn not_eq( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> crate::Result { + // Because the bounds are not necessarily a min or max value, + // this cannot be answered using them. notEq(col, X) with (X, Y) + // doesn't guarantee that X is a value in col. + ROWS_MIGHT_MATCH + } + + fn starts_with( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> crate::Result { + let field_id = reference.field().id; + + if self.contains_nulls_only(field_id) { + return ROWS_CANNOT_MATCH; + } + + let PrimitiveLiteral::String(prefix) = datum.literal() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot use StartsWith operator on non-string values", + )); + }; + + if let Some(lower_bound) = self.lower_bound(field_id) { + let Literal::Primitive(PrimitiveLiteral::String(lower_bound)) = lower_bound else { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot use StartsWith operator on non-string lower_bound value", + )); + }; + + let length = lower_bound.len().min(prefix.len()); + + // truncate lower bound so that its length + // is not greater than the length of prefix + if prefix[..length] < lower_bound[..length] { + return ROWS_CANNOT_MATCH; + } + } + + if let Some(upper_bound) = self.upper_bound(field_id) { + let Literal::Primitive(PrimitiveLiteral::String(upper_bound)) = upper_bound else { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot use StartsWith operator on non-string upper_bound value", + )); + }; + + let length = upper_bound.len().min(prefix.len()); + + // truncate upper bound so that its length + // is not greater than the length of prefix + if prefix[..length] > upper_bound[..length] { + return ROWS_CANNOT_MATCH; + } + } + + ROWS_MIGHT_MATCH + } + + fn not_starts_with( + &mut self, + reference: &BoundReference, + datum: &Datum, + _predicate: &BoundPredicate, + ) -> crate::Result { + let field_id = reference.field().id; + + if self.may_contain_null(field_id) { + return ROWS_MIGHT_MATCH; + } + + // notStartsWith will match unless all values must start with the prefix. + // This happens when the lower and upper bounds both start with the prefix. + + let PrimitiveLiteral::String(prefix) = datum.literal() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot use StartsWith operator on non-string values", + )); + }; + + let Some(lower_bound) = self.lower_bound(field_id) else { + return ROWS_MIGHT_MATCH; + }; + + let Literal::Primitive(PrimitiveLiteral::String(lower_bound_str)) = lower_bound else { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot use NotStartsWith operator on non-string lower_bound value", + )); + }; + + if lower_bound_str < prefix { + // if lower is shorter than the prefix then lower doesn't start with the prefix + return ROWS_MIGHT_MATCH; + } + + if lower_bound_str[..prefix.len()] == *prefix { + // lower bound matches the prefix + + let Some(upper_bound) = self.upper_bound(field_id) else { + return ROWS_MIGHT_MATCH; + }; + + let Literal::Primitive(PrimitiveLiteral::String(upper_bound)) = upper_bound else { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot use NotStartsWith operator on non-string upper_bound value", + )); + }; + + // if upper is shorter than the prefix then upper can't start with the prefix + if upper_bound.len() < prefix.len() { + return ROWS_MIGHT_MATCH; + } + + if upper_bound[..prefix.len()] == *prefix { + // both bounds match the prefix, so all rows must match the + // prefix and therefore do not satisfy the predicate + return ROWS_CANNOT_MATCH; + } + } + + ROWS_MIGHT_MATCH + } + + fn r#in( + &mut self, + reference: &BoundReference, + literals: &FnvHashSet, + _predicate: &BoundPredicate, + ) -> crate::Result { + let field_id = reference.field().id; + + if self.contains_nulls_only(field_id) || self.contains_nans_only(field_id) { + return ROWS_CANNOT_MATCH; + } + + if literals.len() > IN_PREDICATE_LIMIT { + // skip evaluating the predicate if the number of values is too big + return ROWS_MIGHT_MATCH; + } + + if let Some(lower_bound) = self.lower_bound(field_id) { + let Literal::Primitive(lower_bound) = lower_bound else { + return Err(Error::new( + ErrorKind::Unexpected, + "Eq Predicate can only compare against a Primitive Literal", + )); + }; + if lower_bound.is_nan() { + // NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more. + return ROWS_MIGHT_MATCH; + } + + if !literals.iter().any(|datum| datum.literal().ge(lower_bound)) { + // if all values are less than lower bound, rows cannot match. + return ROWS_CANNOT_MATCH; + } + } + + if let Some(upper_bound) = self.upper_bound(field_id) { + let Literal::Primitive(upper_bound) = upper_bound else { + return Err(Error::new( + ErrorKind::Unexpected, + "Eq Predicate can only compare against a Primitive Literal", + )); + }; + if upper_bound.is_nan() { + // NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more. + return ROWS_MIGHT_MATCH; + } + + if !literals.iter().any(|datum| datum.literal().le(upper_bound)) { + // if all values are greater than upper bound, rows cannot match. + return ROWS_CANNOT_MATCH; + } + } + + ROWS_MIGHT_MATCH + } + + fn not_in( + &mut self, + reference: &BoundReference, + literals: &FnvHashSet, + _predicate: &BoundPredicate, + ) -> crate::Result { + // Because the bounds are not necessarily a min or max value, + // this cannot be answered using them. notIn(col, {X, ...}) + // with (X, Y) doesn't guarantee that X is a value in col. + ROWS_MIGHT_MATCH + } +} + +impl<'a> DataFileFilterVisitor<'a> { + pub(crate) fn new( + inclusive_metrics_evaluator: &'a InclusiveMetricsEvaluator, + data_file: &'a DataFile, + ) -> Self { + DataFileFilterVisitor { + inclusive_metrics_evaluator, + data_file, + } + } + + fn nan_count(&self, field_id: i32) -> Option<&u64> { + self.data_file.nan_value_counts.get(&field_id) + } + + fn null_count(&self, field_id: i32) -> Option<&u64> { + self.data_file.null_value_counts.get(&field_id) + } + + fn value_count(&self, field_id: i32) -> Option<&u64> { + self.data_file.value_counts.get(&field_id) + } + + fn lower_bound(&self, field_id: i32) -> Option<&Literal> { + self.data_file.lower_bounds.get(&field_id) + } + + fn upper_bound(&self, field_id: i32) -> Option<&Literal> { + self.data_file.upper_bounds.get(&field_id) + } + + fn contains_nans_only(&self, field_id: i32) -> bool { + let nan_count = self.nan_count(field_id); + let value_count = self.value_count(field_id); + + nan_count == value_count + } + + fn contains_nulls_only(&self, field_id: i32) -> bool { + let null_count = self.null_count(field_id); + let value_count = self.value_count(field_id); + + null_count == value_count + } + + fn may_contain_null(&self, field_id: i32) -> bool { + if let Some(&null_count) = self.null_count(field_id) { + null_count > 0 + } else { + true + } + } +} + +#[cfg(test)] +mod test { + use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; + use crate::expr::{Bind, Predicate}; + use crate::spec::{ + DataContentType, DataFile, DataFileFormat, FieldSummary, NestedField, PartitionField, + PartitionSpec, PrimitiveType, Schema, Struct, Transform, Type, + }; + use std::sync::Arc; + + #[test] + fn test_data_file_no_partitions() { + let (table_schema_ref, _partition_spec_ref) = create_test_schema_and_partition_spec(); + + let partition_filter = Predicate::AlwaysTrue + .bind(table_schema_ref.clone(), false) + .unwrap(); + + let case_sensitive = false; + + let manifest_file_partitions = vec![]; + let data_file = create_test_data_file(manifest_file_partitions); + + let inclusive_metrics_evaluator = + InclusiveMetricsEvaluator::new(partition_filter, case_sensitive).unwrap(); + + let result = inclusive_metrics_evaluator.eval(&data_file).unwrap(); + + assert!(result); + } + + fn create_test_schema_and_partition_spec() -> (Arc, Arc) { + let table_schema = Schema::builder() + .with_fields(vec![Arc::new(NestedField::optional( + 1, + "a", + Type::Primitive(PrimitiveType::Float), + ))]) + .build() + .unwrap(); + let table_schema_ref = Arc::new(table_schema); + + let partition_spec = PartitionSpec::builder() + .with_spec_id(1) + .with_fields(vec![PartitionField::builder() + .source_id(1) + .name("a".to_string()) + .field_id(1) + .transform(Transform::Identity) + .build()]) + .build() + .unwrap(); + let partition_spec_ref = Arc::new(partition_spec); + (table_schema_ref, partition_spec_ref) + } + + fn create_test_data_file(_manifest_file_partitions: Vec) -> DataFile { + DataFile { + content: DataContentType::Data, + file_path: "/test/path".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count: 10, + file_size_in_bytes: 10, + column_sizes: Default::default(), + value_counts: Default::default(), + null_value_counts: Default::default(), + nan_value_counts: Default::default(), + lower_bounds: Default::default(), + upper_bounds: Default::default(), + key_metadata: vec![], + split_offsets: vec![], + equality_ids: vec![], + sort_order_id: None, + } + } +} diff --git a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs index 16d648178..417b465d8 100644 --- a/crates/iceberg/src/expr/visitors/manifest_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/manifest_evaluator.rs @@ -29,7 +29,6 @@ use std::sync::Arc; pub(crate) struct ManifestEvaluator { partition_schema: SchemaRef, partition_filter: BoundPredicate, - case_sensitive: bool, } impl ManifestEvaluator { @@ -67,7 +66,6 @@ impl ManifestEvaluator { Ok(Self { partition_schema: partition_schema_ref, partition_filter, - case_sensitive, }) } diff --git a/crates/iceberg/src/expr/visitors/mod.rs b/crates/iceberg/src/expr/visitors/mod.rs index 709ccd6c7..805f7dd4f 100644 --- a/crates/iceberg/src/expr/visitors/mod.rs +++ b/crates/iceberg/src/expr/visitors/mod.rs @@ -16,5 +16,6 @@ // under the License. pub(crate) mod bound_predicate_visitor; +pub(crate) mod inclusive_metrics_evaluator; pub(crate) mod inclusive_projection; pub(crate) mod manifest_evaluator; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 1e34572db..36937c2c9 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -18,6 +18,7 @@ //! Table scan api. use crate::arrow::ArrowReaderBuilder; +use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::visitors::manifest_evaluator::ManifestEvaluator; use crate::expr::{Bind, Predicate}; use crate::io::FileIO; @@ -174,10 +175,10 @@ pub type FileScanTaskStream = BoxStream<'static, crate::Result>; impl TableScan { /// Returns a stream of file scan tasks. - pub async fn plan_files(&self) -> crate::Result { // Cache `ManifestEvaluatorFactory`s created as part of this scan let mut manifest_evaluator_factory_cache: HashMap = HashMap::new(); + let mut data_file_evaluator_cache: HashMap = HashMap::new(); // these variables needed to ensure that we don't need to pass a // reference to self into `try_stream`, as it expects references @@ -204,8 +205,7 @@ impl TableScan { if let Some(filter) = filter.as_ref() { let manifest_eval_factory = manifest_evaluator_factory_cache .entry(entry.partition_spec_id) - .or_insert_with_key(|key| Self::create_manifest_eval_factory(key, schema.clone(), table_metadata.clone(), case_sensitive, filter)); - + .or_insert_with_key(|key| Self::create_manifest_eval_factory(key, schema.clone(), table_metadata.clone(), case_sensitive, filter).unwrap()); // reject any manifest files whose partition values don't match the filter. if !manifest_eval_factory.eval(entry)? { @@ -217,6 +217,25 @@ impl TableScan { let mut manifest_entries = iter(manifest.entries().iter().filter(|e| e.is_alive())); while let Some(manifest_entry) = manifest_entries.next().await { + + // If this scan has a filter, check the data file evaluator cache for an existing + // InclusiveMetricsEvaluator that matches this manifest's partition spec ID. + // Use one from the cache if there is one. If not, create one, put it in + // the cache, and take a reference to it. + if let Some(filter) = filter.as_ref() { + #[allow(clippy::map_entry)] + if !data_file_evaluator_cache.contains_key(&entry.partition_spec_id) { + data_file_evaluator_cache.insert(entry.partition_spec_id, Self::create_data_file_evaluator(schema.clone(), case_sensitive, filter)?); + } + + let data_file_evaluator = &data_file_evaluator_cache[&entry.partition_spec_id]; + + // reject any manifest entries whose data file's partition values don't match the filter. + if !data_file_evaluator.eval(manifest_entry.data_file())? { + continue; + } + } + match manifest_entry.content_type() { DataContentType::EqualityDeletes | DataContentType::PositionDeletes => { yield Err(Error::new( @@ -240,16 +259,18 @@ impl TableScan { } fn create_manifest_eval_factory( - //&self, id: &i32, schema: SchemaRef, table_metadata: Arc, case_sensitive: bool, filter: &Predicate, - ) -> ManifestEvaluator { - let bound_predicate = filter.bind(schema.clone(), case_sensitive).unwrap(); + ) -> crate::Result { + let bound_predicate = filter.bind(schema.clone(), case_sensitive)?; - let partition_spec = table_metadata.partition_spec_by_id(*id).unwrap(); + let partition_spec = table_metadata.partition_spec_by_id(*id).ok_or(Error::new( + ErrorKind::Unexpected, + format!("Could not find partition spec for id {id}"), + ))?; ManifestEvaluator::new( partition_spec.clone(), @@ -257,7 +278,16 @@ impl TableScan { bound_predicate, case_sensitive, ) - .unwrap() + } + + fn create_data_file_evaluator( + schema: SchemaRef, + case_sensitive: bool, + filter: &Predicate, + ) -> crate::Result { + let bound_predicate = filter.bind(schema.clone(), case_sensitive)?; + + InclusiveMetricsEvaluator::new(bound_predicate, false) } pub async fn to_arrow(&self) -> crate::Result { diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 62a1d5832..c05d96f5c 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -80,6 +80,18 @@ pub enum PrimitiveLiteral { Decimal(i128), } +impl PrimitiveLiteral { + /// Returns true if the Literal represents a primitive type + /// that can be a NaN, and that it's value is NaN + pub fn is_nan(&self) -> bool { + match self { + PrimitiveLiteral::Double(val) => val.is_nan(), + PrimitiveLiteral::Float(val) => val.is_nan(), + _ => false, + } + } +} + /// Literal associated with its type. The value and type pair is checked when construction, so the type and value is /// guaranteed to be correct when used. /// @@ -689,6 +701,16 @@ impl Datum { pub fn data_type(&self) -> &PrimitiveType { &self.r#type } + + /// Returns true if the Literal represents a primitive type + /// that can be a NaN, and that it's value is NaN + pub fn is_nan(&self) -> bool { + match self.literal { + PrimitiveLiteral::Double(val) => val.is_nan(), + PrimitiveLiteral::Float(val) => val.is_nan(), + _ => false, + } + } } /// Values present in iceberg type