Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

perf: improve filter parallelism #15686

Merged
merged 4 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions crates/polars-core/src/datatypes/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,36 @@ impl DataType {
}
}

pub fn contains_categoricals(&self) -> bool {
use DataType::*;
match self {
#[cfg(feature = "dtype-categorical")]
Categorical(_, _) | Enum(_, _) => true,
List(inner) => inner.contains_categoricals(),
#[cfg(feature = "dtype-array")]
Array(inner, _) => inner.contains_categoricals(),
#[cfg(feature = "dtype-struct")]
Struct(fields) => fields
.iter()
.any(|field| field.dtype.contains_categoricals()),
_ => false,
}
}

pub fn contains_objects(&self) -> bool {
use DataType::*;
match self {
#[cfg(feature = "object")]
Object(_, _) => true,
List(inner) => inner.contains_objects(),
#[cfg(feature = "dtype-array")]
Array(inner, _) => inner.contains_objects(),
#[cfg(feature = "dtype-struct")]
Struct(fields) => fields.iter().any(|field| field.dtype.contains_objects()),
_ => false,
}
}

/// Check if type is sortable
pub fn is_ord(&self) -> bool {
#[cfg(feature = "dtype-categorical")]
Expand Down
16 changes: 16 additions & 0 deletions crates/polars-core/src/frame/chunks.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use arrow::record_batch::RecordBatch;
use rayon::prelude::*;

use crate::prelude::*;
use crate::utils::_split_offsets;
use crate::POOL;

pub type ArrowChunk = RecordBatch<ArrayRef>;

Expand Down Expand Up @@ -40,4 +43,17 @@ impl DataFrame {
DataFrame::new_no_checks(columns)
})
}

pub fn split_chunks_by_n(self, n: usize, parallel: bool) -> Vec<DataFrame> {
let split = _split_offsets(self.height(), n);

let split_fn = |(offset, len)| self.slice(offset as i64, len);

if parallel {
// Parallel so that null_counts run in parallel
POOL.install(|| split.into_par_iter().map(split_fn).collect())
} else {
split.into_iter().map(split_fn).collect()
}
}
}
60 changes: 41 additions & 19 deletions crates/polars-lazy/src/physical_plan/executors/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,54 @@ impl FilterExec {
}
}

fn execute_impl(
fn execute_hor(
&mut self,
df: DataFrame,
state: &mut ExecutionState,
) -> PolarsResult<DataFrame> {
// Vertical parallelism.
let df = if self.streamable && df.n_chunks() > 1 && df.height() > 0 {
let chunks = df.split_chunks().collect::<Vec<_>>();
let iter = chunks.into_par_iter().map(|df| {
let s = self.predicate.evaluate(&df, state)?;
df.filter(series_to_mask(&s)?)
});
if self.has_window {
state.insert_has_window_function_flag()
}
let s = self.predicate.evaluate(&df, state)?;
if self.has_window {
state.clear_window_expr_cache()
}
df.filter(series_to_mask(&s)?)
}

let df = POOL.install(|| iter.collect::<PolarsResult<Vec<_>>>())?;
accumulate_dataframes_vertical_unchecked(df)
} else {
if self.has_window {
state.insert_has_window_function_flag()
}
fn execute_chunks(
&mut self,
chunks: Vec<DataFrame>,
state: &ExecutionState,
) -> PolarsResult<DataFrame> {
let iter = chunks.into_par_iter().map(|df| {
let s = self.predicate.evaluate(&df, state)?;
if self.has_window {
state.clear_window_expr_cache()
df.filter(series_to_mask(&s)?)
});
let df = POOL.install(|| iter.collect::<PolarsResult<Vec<_>>>())?;
Ok(accumulate_dataframes_vertical_unchecked(df))
}

fn execute_impl(
&mut self,
df: DataFrame,
state: &mut ExecutionState,
) -> PolarsResult<DataFrame> {
let n_partitions = POOL.current_num_threads();
// Vertical parallelism.
if self.streamable && df.height() > 0 {
if df.n_chunks() > 1 {
let chunks = df.split_chunks().collect::<Vec<_>>();
self.execute_chunks(chunks, state)
} else if df.width() < n_partitions {
self.execute_hor(df, state)
} else {
let chunks = df.split_chunks_by_n(n_partitions, true);
self.execute_chunks(chunks, state)
}
df.filter(series_to_mask(&s)?)?
};
Ok(df)
} else {
self.execute_hor(df, state)
}
}
}

Expand Down
19 changes: 18 additions & 1 deletion crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,25 @@ pub fn create_physical_plan(
Ok(Box::new(executors::SliceExec { input, offset, len }))
},
Filter { input, predicate } => {
let streamable = is_streamable(predicate.node(), expr_arena, Context::Default);
let mut streamable = is_streamable(predicate.node(), expr_arena, Context::Default);
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
if streamable {
// This can cause problems with string caches
streamable = !input_schema
.iter_dtypes()
.any(|dt| dt.contains_categoricals())
|| {
#[cfg(feature = "dtype-categorical")]
{
polars_core::using_string_cache()
}

#[cfg(not(feature = "dtype-categorical"))]
{
false
}
}
}
let input = create_physical_plan(input, lp_arena, expr_arena)?;
let mut state = ExpressionConversionState::default();
let predicate = create_physical_expr(
Expand Down
Loading