From 4d7f144e5d8f4dfa357c0a28bd7d782f4a23c031 Mon Sep 17 00:00:00 2001 From: ritchie Date: Sun, 19 Jan 2025 09:11:23 +0100 Subject: [PATCH] feat: Imprive window function caching strategy --- .../src/executors/projection_utils.rs | 115 +++++++++++------- 1 file changed, 71 insertions(+), 44 deletions(-) diff --git a/crates/polars-mem-engine/src/executors/projection_utils.rs b/crates/polars-mem-engine/src/executors/projection_utils.rs index f8f5461c619..1443c137500 100644 --- a/crates/polars-mem-engine/src/executors/projection_utils.rs +++ b/crates/polars-mem-engine/src/executors/projection_utils.rs @@ -49,51 +49,77 @@ fn window_evaluate( state: &ExecutionState, window: PlHashMap>, ) -> PolarsResult>> { - POOL.install(|| { - window - .par_iter() - .map(|(_, partition)| { - // clear the cache for every partitioned group - let mut state = state.split(); - // inform the expression it has window functions. - state.insert_has_window_function_flag(); - - // caching more than one window expression is a complicated topic for another day - // see issue #2523 - let cache = partition.len() > 1 - && partition.iter().all(|(_, e)| { - e.as_expression() - .unwrap() - .into_iter() - .filter(|e| matches!(e, Expr::Window { .. })) - .count() - == 1 - }); - let mut first_result = None; - // First run 1 to fill the cache. Condvars and such don't work as - // rayon threads should not be blocked. - if cache { - let first = &partition[0]; - let c = first.1.evaluate(df, &state)?; - first_result = Some((first.0, c)); - state.insert_cache_window_flag(); - } else { - state.remove_cache_window_flag(); - } + let n_threads = POOL.current_num_threads(); - let mut results = partition[first_result.is_some() as usize..] - .par_iter() - .map(|(index, e)| e.evaluate(df, &state).map(|c| (*index, c))) - .collect::>>()?; + let max_hor = window.values().map(|v| v.len()).max().expect("length > 0"); + let vert = window.len(); - if let Some(item) = first_result { - results.push(item) - } + // We don't want to cache and parallel horizontally and vertically as that keeps many cache + // states alive. + let (cache, par_vertical, par_horizontal) = if max_hor >= n_threads || max_hor >= vert { + (true, false, true) + } else { + (false, true, true) + }; - Ok(results) - }) - .collect() - }) + let apply = |partition: &[(u32, Arc)]| { + // clear the cache for every partitioned group + let mut state = state.split(); + // inform the expression it has window functions. + state.insert_has_window_function_flag(); + + // caching more than one window expression is a complicated topic for another day + // see issue #2523 + let cache = cache + && partition.len() > 1 + && partition.iter().all(|(_, e)| { + e.as_expression() + .unwrap() + .into_iter() + .filter(|e| matches!(e, Expr::Window { .. })) + .count() + == 1 + }); + let mut first_result = None; + // First run 1 to fill the cache. Condvars and such don't work as + // rayon threads should not be blocked. + if cache { + let first = &partition[0]; + let c = first.1.evaluate(df, &state)?; + first_result = Some((first.0, c)); + state.insert_cache_window_flag(); + } else { + state.remove_cache_window_flag(); + } + + let apply = + |index: &u32, e: &Arc| e.evaluate(df, &state).map(|c| (*index, c)); + + let slice = &partition[first_result.is_some() as usize..]; + let mut results = if par_horizontal { + slice + .par_iter() + .map(|(index, e)| apply(index, e)) + .collect::>>()? + } else { + slice + .iter() + .map(|(index, e)| apply(index, e)) + .collect::>>()? + }; + + if let Some(item) = first_result { + results.push(item) + } + + Ok(results) + }; + + if par_vertical { + POOL.install(|| window.par_iter().map(|t| apply(t.1)).collect()) + } else { + window.iter().map(|t| apply(t.1)).collect() + } } fn execute_projection_cached_window_fns( @@ -130,8 +156,9 @@ fn execute_projection_cached_window_fns( } = e { let entry = match options { - WindowType::Over(_) => { - let mut key = format!("{:?}", partition_by.as_slice()); + WindowType::Over(g) => { + let g: &str = g.into(); + let mut key = format!("{:?}_{}", partition_by.as_slice(), g); if let Some((e, k)) = order_by { polars_expr::prelude::window_function_format_order_by( &mut key,