Skip to content

feat: Use bloom filter when reading parquet to skip row groups #7821

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 11 commits into from
Oct 25, 2023
45 changes: 40 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ pub struct ParquetExec {
/// Override for `Self::with_enable_page_index`. If None, uses
/// values from base_config
enable_page_index: Option<bool>,
/// Override for `Self::with_enable_bloom_filter`. If None, uses
/// values from base_config
enable_bloom_filter: Option<bool>,
/// Base configuration for this scan
base_config: FileScanConfig,
projected_statistics: Statistics,
Expand Down Expand Up @@ -151,6 +154,7 @@ impl ParquetExec {
pushdown_filters: None,
reorder_filters: None,
enable_page_index: None,
enable_bloom_filter: None,
base_config,
projected_schema,
projected_statistics,
Expand Down Expand Up @@ -244,6 +248,18 @@ impl ParquetExec {
.unwrap_or(config_options.execution.parquet.enable_page_index)
}

/// If enabled, the reader will read by the bloom filter
pub fn with_enable_bloom_filter(mut self, enable_bloom_filter: bool) -> Self {
self.enable_bloom_filter = Some(enable_bloom_filter);
self
}

/// Return the value described in [`Self::with_enable_bloom_filter`]
fn enable_bloom_filter(&self, config_options: &ConfigOptions) -> bool {
self.enable_bloom_filter
.unwrap_or(config_options.execution.parquet.bloom_filter_enabled)
}

/// Redistribute files across partitions according to their size
/// See comments on `get_file_groups_repartitioned()` for more detail.
pub fn get_repartitioned(
Expand Down Expand Up @@ -373,6 +389,7 @@ impl ExecutionPlan for ParquetExec {
pushdown_filters: self.pushdown_filters(config_options),
reorder_filters: self.reorder_filters(config_options),
enable_page_index: self.enable_page_index(config_options),
enable_bloom_filter: self.enable_bloom_filter(config_options),
};

let stream =
Expand Down Expand Up @@ -406,6 +423,7 @@ struct ParquetOpener {
pushdown_filters: bool,
reorder_filters: bool,
enable_page_index: bool,
enable_bloom_filter: bool,
}

impl FileOpener for ParquetOpener {
Expand Down Expand Up @@ -440,6 +458,7 @@ impl FileOpener for ParquetOpener {
self.enable_page_index,
&self.page_pruning_predicate,
);
let enable_bloom_filter = self.enable_bloom_filter;
let limit = self.limit;

Ok(Box::pin(async move {
Expand Down Expand Up @@ -482,16 +501,32 @@ impl FileOpener for ParquetOpener {
};
};

// Row group pruning: attempt to skip entire row_groups
// Row group pruning by statistics: attempt to skip entire row_groups
// using metadata on the row groups
let file_metadata = builder.metadata();
let row_groups = row_groups::prune_row_groups(
let file_metadata = builder.metadata().clone();
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let mut row_groups = row_groups::prune_row_groups_by_statistics(
file_metadata.row_groups(),
file_range,
pruning_predicate.as_ref().map(|p| p.as_ref()),
predicate,
&file_metrics,
);

// Bloom filter pruning: if bloom filters are enabled and then attempt to skip entire row_groups
// using bloom filters on the row groups
if enable_bloom_filter && !row_groups.is_empty() {
if let Some(predicate) = predicate {
row_groups = row_groups::prune_row_groups_by_bloom_filters(
&mut builder,
&row_groups,
file_metadata.row_groups(),
predicate,
&file_metrics,
)
.await;
}
}

// page index pruning: if all data on individual pages can
// be ruled using page metadata, rows from other columns
// with that range can be skipped as well
Expand Down Expand Up @@ -567,7 +602,7 @@ impl DefaultParquetFileReaderFactory {
}

/// Implements [`AsyncFileReader`] for a parquet file in object storage
struct ParquetFileReader {
pub(crate) struct ParquetFileReader {
file_metrics: ParquetFileMetrics,
inner: ParquetObjectReader,
}
Expand Down
Loading