Skip to content

Commit

Permalink
feat: Imprive window function caching strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 19, 2025
1 parent d9f3e3d commit 4d7f144
Showing 1 changed file with 71 additions and 44 deletions.
115 changes: 71 additions & 44 deletions crates/polars-mem-engine/src/executors/projection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,51 +49,77 @@ fn window_evaluate(
state: &ExecutionState,
window: PlHashMap<String, Vec<IdAndExpression>>,
) -> PolarsResult<Vec<Vec<(u32, Column)>>> {
POOL.install(|| {
window
.par_iter()
.map(|(_, partition)| {
// clear the cache for every partitioned group
let mut state = state.split();
// inform the expression it has window functions.
state.insert_has_window_function_flag();

// caching more than one window expression is a complicated topic for another day
// see issue #2523
let cache = partition.len() > 1
&& partition.iter().all(|(_, e)| {
e.as_expression()
.unwrap()
.into_iter()
.filter(|e| matches!(e, Expr::Window { .. }))
.count()
== 1
});
let mut first_result = None;
// First run 1 to fill the cache. Condvars and such don't work as
// rayon threads should not be blocked.
if cache {
let first = &partition[0];
let c = first.1.evaluate(df, &state)?;
first_result = Some((first.0, c));
state.insert_cache_window_flag();
} else {
state.remove_cache_window_flag();
}
let n_threads = POOL.current_num_threads();

let mut results = partition[first_result.is_some() as usize..]
.par_iter()
.map(|(index, e)| e.evaluate(df, &state).map(|c| (*index, c)))
.collect::<PolarsResult<Vec<_>>>()?;
let max_hor = window.values().map(|v| v.len()).max().expect("length > 0");
let vert = window.len();

if let Some(item) = first_result {
results.push(item)
}
// We don't want to cache and parallel horizontally and vertically as that keeps many cache
// states alive.
let (cache, par_vertical, par_horizontal) = if max_hor >= n_threads || max_hor >= vert {
(true, false, true)
} else {
(false, true, true)
};

Ok(results)
})
.collect()
})
let apply = |partition: &[(u32, Arc<dyn PhysicalExpr>)]| {
// clear the cache for every partitioned group
let mut state = state.split();
// inform the expression it has window functions.
state.insert_has_window_function_flag();

// caching more than one window expression is a complicated topic for another day
// see issue #2523
let cache = cache
&& partition.len() > 1
&& partition.iter().all(|(_, e)| {
e.as_expression()
.unwrap()
.into_iter()
.filter(|e| matches!(e, Expr::Window { .. }))
.count()
== 1
});
let mut first_result = None;
// First run 1 to fill the cache. Condvars and such don't work as
// rayon threads should not be blocked.
if cache {
let first = &partition[0];
let c = first.1.evaluate(df, &state)?;
first_result = Some((first.0, c));
state.insert_cache_window_flag();
} else {
state.remove_cache_window_flag();
}

let apply =
|index: &u32, e: &Arc<dyn PhysicalExpr>| e.evaluate(df, &state).map(|c| (*index, c));

let slice = &partition[first_result.is_some() as usize..];
let mut results = if par_horizontal {
slice
.par_iter()
.map(|(index, e)| apply(index, e))
.collect::<PolarsResult<Vec<_>>>()?
} else {
slice
.iter()
.map(|(index, e)| apply(index, e))
.collect::<PolarsResult<Vec<_>>>()?
};

if let Some(item) = first_result {
results.push(item)
}

Ok(results)
};

if par_vertical {
POOL.install(|| window.par_iter().map(|t| apply(t.1)).collect())
} else {
window.iter().map(|t| apply(t.1)).collect()
}
}

fn execute_projection_cached_window_fns(
Expand Down Expand Up @@ -130,8 +156,9 @@ fn execute_projection_cached_window_fns(
} = e
{
let entry = match options {
WindowType::Over(_) => {
let mut key = format!("{:?}", partition_by.as_slice());
WindowType::Over(g) => {
let g: &str = g.into();
let mut key = format!("{:?}_{}", partition_by.as_slice(), g);
if let Some((e, k)) = order_by {
polars_expr::prelude::window_function_format_order_by(
&mut key,
Expand Down

0 comments on commit 4d7f144

Please sign in to comment.