Skip to content

[Just for benchmark, no merged] Modify skip ratio, and remove locked #12765

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Closed
wants to merge 14 commits into from
4 changes: 2 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,11 @@ config_namespace! {
/// Aggregation ratio (number of distinct groups / number of input rows)
/// threshold for skipping partial aggregation. If the value is greater
/// then partial aggregation will skip aggregation for further input
pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.1

/// Number of input rows partial aggregation partition should process, before
/// aggregation ratio check and trying to switch to skipping aggregation mode
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 0

/// Should DataFusion use row number estimates at the input to decide
/// whether increasing parallelism is beneficial or not. By default,
Expand Down
30 changes: 6 additions & 24 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,6 @@ struct SkipAggregationProbe {
/// Flag indicating further updates of `SkipAggregationProbe` state won't
/// make any effect (set either while probing or on probing completion)
is_locked: bool,

/// Number of rows where state was output without aggregation.
///
/// * If 0, all input rows were aggregated (should_skip was always false)
///
/// * if greater than zero, the number of rows which were output directly
/// without aggregation
skipped_aggregation_rows: metrics::Count,
}

impl SkipAggregationProbe {
Expand All @@ -160,7 +152,6 @@ impl SkipAggregationProbe {
probe_ratio_threshold,
should_skip: false,
is_locked: false,
skipped_aggregation_rows,
}
}

Expand All @@ -171,26 +162,17 @@ impl SkipAggregationProbe {
/// aggregation ratio and sets `should_skip` flag
/// - if `should_skip` is set, locks further state updates
fn update_state(&mut self, input_rows: usize, num_groups: usize) {
if self.is_locked {
return;
}
self.input_rows += input_rows;
self.num_groups = num_groups;
if self.input_rows >= self.probe_rows_threshold {
self.should_skip = self.num_groups as f64 / self.input_rows as f64
>= self.probe_ratio_threshold;
self.is_locked = true;
}
}

fn should_skip(&self) -> bool {
self.should_skip
}

/// Record the number of rows that were output directly without aggregation
fn record_skipped(&mut self, batch: &RecordBatch) {
self.skipped_aggregation_rows.add(batch.num_rows());
}
}

/// HashTable based Grouping Aggregator
Expand Down Expand Up @@ -616,7 +598,7 @@ impl Stream for GroupedHashAggregateStream {

// Do the grouping
extract_ok!(self.group_aggregate_batch(batch));

self.update_skip_aggregation_probe(input_rows);

// If we can begin emitting rows, do so,
Expand All @@ -640,10 +622,10 @@ impl Stream for GroupedHashAggregateStream {
break 'reading_input;
}

extract_ok!(self.emit_early_if_necessary());

extract_ok!(self.switch_to_skip_aggregation());

extract_ok!(self.emit_early_if_necessary());

timer.done();
}

Expand Down Expand Up @@ -700,9 +682,9 @@ impl Stream for GroupedHashAggregateStream {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let _timer = elapsed_compute.timer();
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
probe.record_skipped(&batch);
}
// if let Some(probe) = self.skip_aggregation_probe.as_mut() {
// probe.record_skipped(&batch);
// }
let states = self.transform_to_states(batch)?;
return Poll::Ready(Some(Ok(
states.record_output(&self.baseline_metrics)
Expand Down
Loading