-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge partition columns into scan statistics for data skipping #615
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #615 +/- ##
==========================================
+ Coverage 83.45% 83.47% +0.02%
==========================================
Files 74 74
Lines 16877 17038 +161
Branches 16877 17038 +161
==========================================
+ Hits 14084 14223 +139
- Misses 2135 2138 +3
- Partials 658 677 +19 ☔ View full report in Codecov by Sentry. |
Digging into the failures that are caused when trying to compile in the |
I'm not sure the exact best way to do this, but my first inclination would be to do something along the lines of building an output struct of the partition values and then using a conditional expression to take the partition values if they exist or the stats values if not. But it looks like the expressions don't have conditionals, so that's not useful. I'm going to think about it, but I don't know the best way forward right now. |
@timsaucer - thanks for contributing and you are correct, outside the default/arrow engine implementation, we want to make no assumption about the representation of data. I believe many things are on the right track - i.e. extending the stats with min/max values for the partition columns. The general idea would likely be to create an expression that assigns constants values extracted from the file actions - i.e a Not entirely sure, but there were some discussions on how we can tell the engine the best way to generate some data for us. IIRC, expressions might be the way to go, but @nicklan may have some more recent thoughts on this. Does this already help a bit? If not, I could dig a bit more into the code and try to give some more concrete hints. |
Thank you. I appreciate the feedback. I’m going to think about it. I’ve got very limited time over the next week but I hope to get this resolved soon. For my use case, this will majorly impact how well the predicates work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general idea would likely be to create an expression that assigns constants values extracted from the file actions - i.e a Struct that assigns literals to the respective fields, and then let the engine evaluate that expression.
Not entirely sure, but there were some discussions on how we can tell the engine the best way to generate some data for us. IIRC, expressions might be the way to go, but @nicklan may have some more recent thoughts on this.
AFAIK, we have four challenges here:
- Extracting the requested partition values from the
add.partitionValues
string-string map. - Parsing each string partition value into its proper type
- Assembling the partition values into a proper struct
- Teach
trait DataSkippingPredicateEvaluator
and/or the impl of that trait forDataSkippingPredicateCreator
to consume that struct (more below)
I can think of two main ways to approach 1/2/3:
- (a) Use table-level expressions. We don't currently have any kind of map extractor expression (@hntd187 did some exploring a while ago, see Map access for expressions #352), and we also don't have any from-string casting expressions. Once we had those two, it's a simple matter to emit the desired
struct
expression.- PRO: One and done -- a single "table-level" expression" applies to all data chunks we process
- PRO: Engine (not kernel) does the hard work of map probing and from-string casting.
- CON: Complexity. Two whole new kinds of expressions we need to define and support.
- CON: Higher risk that engine's string parsing semantics do not match the Delta spec
- (b) Use row visitor API to extract partition values and emit file-level expression literals. This is what the duckdb extension currently does (if you squint)
- PRO: The row visitor API already provides map access, and PrimitiveType::parse_scalar already provides the needed from-string capability.
- PRO: No worries about engine's string parsing semantics, kernel handles it
- CON: Row visitor API will be inefficient for big tables with lots of file actions.
- CON: Per-file expressions are more expensive to create and manage than per-table expressions. But this is less of a concern because other Delta features will anyway require per-file expression support (so why not use it here as well).
I favor per-file expressions leveraging the row visitor API, because it uses building blocks we already have. And kernel's mantra is "simplicity" and "don't pessimize" -- rather than "optimize" -- so it's ok if this is not the absolute most efficient approach possible.
my first inclination would be to do something along the lines of building an output struct of the partition values and then using a conditional expression to take the partition values if they exist or the stats values if not. But it looks like the expressions don't have conditionals, so that's not useful
This relates to 4/ from the list of challenges above. Fortunately, the set of partition columns is fixed for any table, so we don't need runtime conditional expressions -- instead, the data skipping expression generator can track the set of partition columns and conditionally emit stats- or partition-based data skipping expressions as appropriate.
One possibility is to change the data skipping predicate impl of get_min_stat
and get_max_stat
to conditionally return an expression over the partition value instead of the (non-existing) stat. See https://github.com/delta-io/delta-kernel-rs/blob/main/kernel/src/scan/data_skipping.rs#L170-L178. That would be simple, and works perfectly for gt/lt/ge/le, but is sub-optimal for eq/ne because the current skipping logic has to assume min != max. That approach would also just plain fail for null checking predicates, because there are no rowcount or nullcount stats for partition colums (instead, the partition value itself is either null or non-null).
We could improve the eq/ne case by making DataSkippingPredicateEvaluator::eval_eq directly aware of partition values, instead of relying on eval_partial_cmp
. Similarly, we could improve the null check case by making DataSkippingPredicate::eval_is_null for DataSkippingPredicateCreator
directly aware of partition values.
.map(|(_, v)| v.to_string()) | ||
}) | ||
}) | ||
.collect::<Vec<Option<String>>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use Itertools::collect_vec
to avoid the turbofish
let mut kv = | ||
keys.iter() | ||
.zip(values.iter()) | ||
.filter_map(|(k, v)| match (k, v) { | ||
(Some(k), Some(v)) => Some((k, v)), | ||
_ => None, | ||
}); | ||
|
||
kv.find(|(k, _)| *k == key.as_str()) | ||
.map(|(_, v)| v.to_string()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just:
let mut kv = | |
keys.iter() | |
.zip(values.iter()) | |
.filter_map(|(k, v)| match (k, v) { | |
(Some(k), Some(v)) => Some((k, v)), | |
_ => None, | |
}); | |
kv.find(|(k, _)| *k == key.as_str()) | |
.map(|(_, v)| v.to_string()) | |
keys | |
.iter() | |
.zip(values.iter()) | |
.filter_map(|(k, v)| match (k, v) { | |
(Some(k), Some(v)) if *k == key.as_str() => Some(v.to_string()), | |
_ => None, | |
}) | |
.last() |
(have to return the last match because arrow and parquet maps are allowed to have duplicate keys, last key wins)
let partitions_column = match partitions_batch.column_by_name("output") { | ||
Some(c) => c, | ||
None => return Ok(stats), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
let partitions_column = match partitions_batch.column_by_name("output") { | |
Some(c) => c, | |
None => return Ok(stats), | |
}; | |
let Some(partitions_column) = partitions_batch.column_by_name("output") else { | |
return Ok(stats); | |
}; |
match field.name().as_str() { | ||
"minValues" => columns.push(merge_partition_fields_into_stats( | ||
stats_batch, | ||
idx, | ||
&partition_values, | ||
)?), | ||
"maxValues" => columns.push(merge_partition_fields_into_stats( | ||
stats_batch, | ||
idx, | ||
&partition_values, | ||
)?), | ||
_ => { | ||
columns.push(Arc::clone(stats_batch.column(idx))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
match field.name().as_str() { | |
"minValues" => columns.push(merge_partition_fields_into_stats( | |
stats_batch, | |
idx, | |
&partition_values, | |
)?), | |
"maxValues" => columns.push(merge_partition_fields_into_stats( | |
stats_batch, | |
idx, | |
&partition_values, | |
)?), | |
_ => { | |
columns.push(Arc::clone(stats_batch.column(idx))); | |
} | |
let column = match field.name().as_str() { | |
"minValues" | "maxValues" => merge_partition_fields_into_stats( | |
stats_batch, | |
idx, | |
&partition_values, | |
)?, | |
_ => Arc::clone(stats_batch.column(idx)), | |
}; | |
columns.push(column); |
I don't understand what this code is doing tho? What should happen if a stat other than min or max is requested for a partition column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: Figured it out -- partition values only get merged in for min/max stats; other stats remain unchanged.
.column(idx) | ||
.as_any() | ||
.downcast_ref::<StructArray>() | ||
.ok_or_else(|| Error::engine_data_type("minValues"))? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this method also get called for maxValues
?
Ok(Arc::new(StructArray::new(fields, arrays, nulls)) | ||
as Arc<(dyn arrow_array::Array + 'static)>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the cast necessary? I thought Arc decayed?
Ok(Arc::new(StructArray::new(fields, arrays, nulls)) | |
as Arc<(dyn arrow_array::Array + 'static)>) | |
let array = Arc::new(StructArray::new(fields, arrays, nulls)); | |
Ok(array) |
or, if the cast is unavoidable:
Ok(Arc::new(StructArray::new(fields, arrays, nulls)) | |
as Arc<(dyn arrow_array::Array + 'static)>) | |
Ok(Arc::new(StructArray::new(fields, arrays, nulls)) as _) |
I just noticed that #607 is already starting to implement (b), tho I suppose it could also be adapted to do (a) instead (it adds supports for both table-level and file-level expressions). |
Thank you @scovich . This is incredibly helpful. I'll try to start digging into the row visitor API and how to apply it in this case. I'm moving this PR to draft since it will obviously need a fairly big change. |
Closing in favor of #624 |
What changes are proposed in this pull request?
Currently the data skipping feature is comparing a predicate to the
minValues
andmaxValues
in the addition log. When the user provides a predicate that needs to match against a partition value, these values are not getting included in the test. Instead every log passes this filter. With this change, we take the values in the partitions and add them to the statistics record batch so that we can match against them to reduce the returned number of logs.How was this change tested?
Tested against my current workflow in DataFusion where I monitor the number of record batches returned from a query with a predicate assigned.
I have expanded a unit test to include a partition based predicate.