-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Adding simple custom partition sort order option to RewriteManifests Spark Action #9731
base: main
Are you sure you want to change the base?
Conversation
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); | ||
|
||
// Check if these partitions are included in the spec | ||
Preconditions.checkArgument( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noting - if the user changes/specifies the spec()
after this check has been done, we may wish to re-check this setup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand the comment here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Say there are 2 partition specs for this table. Spec1 partitions by (a_bucket, b
), Spec2 partitions by (a_bucket_100, c_days
). If you originally pass a valid sort(b, a)
, then change the .specId
to 2, then the validated custom column order will now be inaccurate.
api/src/main/java/org/apache/iceberg/actions/RewriteManifests.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/RewriteManifests.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/RewriteManifests.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest that we make the API based on Partition Transform Names, I think this is probably the only thing we can do reasonably with multi-arg transforms
api/src/main/java/org/apache/iceberg/actions/RewriteManifests.java
Outdated
Show resolved
Hide resolved
Rev 2
Going to work on the Function based api update next. Open to doing so in a new PR or this one. |
Looks like the CI test failed, @zachdisc could you take a look and fix? Overall, we have the 2 options based on this thread: #9731 (comment) option 1: we merge this and have a subsequent PR for the functional API and refactor the current change into it Is there any preference for which way we go? |
I'm not quite sure why that failed, I ran spotlessApply and saw a successful build locally before the last push. Regardless, I have two changes inbound, so I'll fix along with the new updates. |
Rev 3
Thanks to @ Nandhakumar Saravana for pointing this out, when clustering by a string Change imminent for |
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
Show resolved
Hide resolved
@@ -90,13 +94,13 @@ public class RewriteManifestsSparkAction | |||
public static final String USE_CACHING = "use-caching"; | |||
public static final boolean USE_CACHING_DEFAULT = false; | |||
|
|||
private List<String> partitionSortColumns = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we follow public static final
-> private static final
-> private final
-> private
to order variables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a problem? I have these attributes as being set to null
but able to be redefined. So I think private
is correct here, its inline with the other attributes in this class. I do have this attribute in the wrong place though, moving in next rev.
api/src/main/java/org/apache/iceberg/actions/RewriteManifests.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/RewriteManifests.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
Outdated
Show resolved
Hide resolved
Dataset<Row> transformedDF = repartitionAndSort(df, partitionColumn, numManifests); | ||
Column partitionColumn = df.col(clusteringColumnName); | ||
Dataset<Row> transformedDF = | ||
repartitionAndSort(df, partitionColumn, numManifests).drop(clusteringColumnName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The column here is more used for repartitioning, rather than sorting. Sorting only happens for rows with the same clustering column value. I am starting to think if we should call the API repartition(List<String> partitionFields)
, rather than sort(List<String> partitionFields)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to make that change, but IMHO the API is indicating how the entire forest of manifests is sorted, not necessarily how each tree is sorted. Like, its sorting the manifest_list
entries in the end, while the manifest_files
are sorted further within that. Thoughts?
Though, I suppose it isn't really sort
ing the whole forest, it is clustering
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nastra @jackye1995 thoughts on a general rename to cluster()
for this api action?
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
Outdated
Show resolved
Hide resolved
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
I rebased locally and resolved merge conflicts and addressed Russel's ask to remove the function-based rewrite sorting. |
Please let me know the best way to deal with merge conflicts. I thought to rebase and get everything back in sync with the main branch, but that looks like the wrong flow here. I can close this PR and reopen a fresh one if that's preferred. |
Created a fresh PR with feedback here incorporated - #11881 This got messy with the accumulated changes and conflicts (paired with my likely messing things up with a rebase). Suggest closing this out in favor of the fresh start above. |
What
This adds a simple
sort
method to theRewriteManifests
spark action which lets user specify the partition column order to consider when grouping manifests.Illustration:
Closes #9615
Why
Iceberg's metadata is organized into a forest of manifest_files which point to data files sharing common partitions. By default, and during
RewriteManifests
, the partition grouping is determined by the defaultSpec
partition order. If the primary query pattern is more aligned with the last partition in the table's spec, manifests are poorly suited to quickly plan and prune around those partitions.EG
Will create manifests that first group by
region
, whosemanifest_file
contents may span a wide range ofevent_time
values. For a primary query pattern that doesn't care aboutregion
,storeId
, etc, this leads to inefficient queries.Requested Feedback and decisions
sort
be the raw column names used in partitioning, not the internal hidden ones. AKAevent_time
instead ofevent_time_day
.foo
instead offoo_bucket_1234
. Thoughts? Could readily allow both, or just the real, hidden partition column names if people prefersort(row -> {... return groupingString})
, but was struggling to express a JavaFunction
like input that could be used on aDataSet<Row>
'sdata_file
struct - any pointers on how to parse aRow
's struct into a native Pojo and supply a function that can be used like in a UDF here would be apprecaited!