Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data: Add partition stats writer and reader #11216

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

ajantha-bhat
Copy link
Member

@ajantha-bhat ajantha-bhat commented Sep 26, 2024

Introduce APIs to write the partition stats into files in table default format using Iceberg generic writers and readers.

PartitionStatisticsFile partitionStatisticsFile =
        PartitionStatsHandler.computeAndWriteStatsFile(testTable, "b1");

testTable.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile).commit();


@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public boolean equals(Object other) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StructLikeMap was previously handling this implicitly. But when PartitionStatsRecord wraps PartitionStats now for the writers, it needs to override equals and hashcode

StructLike coercedPartition =
PartitionUtil.coercePartition(partitionType, spec, file.partition());
StructLike key = keyTemplate.copyFor(coercedPartition);
Record key = coercedPartitionRecord(file, spec, partitionType);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need Record instead of PartitionData for the writers.

Cannot keep this conversion in the data module as it just to wraps the same PartitionStats object.


/** Wraps the {@link PartitionStats} as {@link Record}. Used by generic writers and readers. */
public class PartitionStatsRecord implements Record, StructLike {
private static final LoadingCache<StructType, Map<String, Integer>> NAME_MAP_CACHE =
Copy link
Member Author

@ajantha-bhat ajantha-bhat Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class is similar to GenericRecord but for a specific partition stats schema.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused why we need a special class for this? GenericRecord should work right? Also Record already implements StructLike so that's unnecessary

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused why we need a special class for this? GenericRecord should work right?

I got a review comment perviously from Anton that keeping the Record in the public interface of writer and readers is fragile. So, New class introduced which is less fragile (coupled with partition stats schema and just wraps the PartitionStats).

#10176 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we need a special class here still? His comment is just to remove Record from the public interface which has been done. I don't think creating a new special class (which is public) is necessary since the records only exist within private handler code? - @aokolnychyi was in Europe last I checked but when he is back he can check it out.

Copy link
Member Author

@ajantha-bhat ajantha-bhat Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, GenericRecord can't wrap the PartitionStats, it maintains its own data array.

Schema schema,
PartitionSpec spec,
int formatVersion,
Map<String, String> properties) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no option to pass the table properties before.
Needed to pass different file format for paramterized test.

@ajantha-bhat ajantha-bhat added this to the Iceberg 1.7.0 milestone Sep 27, 2024
@ajantha-bhat
Copy link
Member Author

@aokolnychyi: This PR is ready. But as we discussed previously, this PR wraps the PartitionStats into a Record as the writers cannot work with Iceberg internal objects yet.

I will explore adding the internal writers for Parquet and Orc. Similar to #11108.
If we fail to have it ready by 1.7.0, I think it makes sense to merge this PR and introduce the optimized writer in the next version by deprecating this writer.

@ajantha-bhat ajantha-bhat marked this pull request as ready for review September 27, 2024 02:00
@ajantha-bhat ajantha-bhat mentioned this pull request Oct 16, 2024
11 tasks
@ajantha-bhat
Copy link
Member Author

@RussellSpitzer: It would be good to have this in 1.7.0.
I am waiting from a month for a review.

@aokolnychyi
Copy link
Contributor

I think we should try to use "internal" writers. @rdblue added "internal" readers recently.

Any guidance on how to add a writer, @rdblue? We can start with Avro for now. We will also need such readers/writers for Parquet.

@ajantha-bhat
Copy link
Member Author

ajantha-bhat commented Oct 24, 2024

@aokolnychyi, @rdblue:

I already tried POC for internal writers on another branch,
c209bc9

The problems:
a) I am using PartitionData instead of Record for partition value, but the PartitionData get() method wraps the byte array to the byte buffer, which is a problem for internal writers, they expect byte[]. So, I didn't felt like using a new class instead of PartitionData just for this.

b) Also, Using partitionData in StructLikeMap is not working fine. Some keys are missing in the map (looks like equals() logic), If I use Record, it is fine.

Maybe in the next version we can have optimized writer and reader (without converter using internal reader and writers).
For end user it doesn't make any difference as new readers can also read the old partition stats parquet file and old readers can read the new partition stats parquet file. So, can we merge this?

}

PartitionStats that = (PartitionStats) other;
return Objects.equals(partition, that.partition)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StructLike doesn't have equal, I think you need to use StructLikeComparator here

Copy link
Member Author

@ajantha-bhat ajantha-bhat Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are storing Record as Partition from coercedPartitionRecord().
Since GenericRecord has equals implemented, calling Objects.equals is working here.
Hence, I didn't add comparator logic.

But agree that need to understand the implementation logic, just by looking at this class, it looks like we need comparator logic. I can update it if it is necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also tried adding the comparator logic today and pass the comparator of partition type.

Since, we are converting the partition values for the writer in PartitionStatsHandler.statsToRecords().
Comparator expects integer value for date column but we have converted the values to LocalDate, hence comparison fails.

If I don't use the comparator, Record.equals() will be called which does array compare and passes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry but we can't assume subtype here unless you want to assert and change the field type above. If we say something is structLike we can't assume it behaves like a record (even if given the current code we know it won't.) If you want it to be a record you need to cast it and assert earlier in the class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I have added the assert (preconditions) to make sure it is always of the type Record. Also, added a comment that why keeping the type as StructType instead of Record when it is always a record.

It is because in future when we introduce internal parquet writers that works with StructLike instead of Record, we don't have to change method signatures and it will be compatible.

@RussellSpitzer
Copy link
Member

Moving out of 1.7.0 since we still have a bit of discussion here

@ajantha-bhat
Copy link
Member Author

ajantha-bhat commented Nov 19, 2024

@RussellSpitzer: I have added the Assertion for Partition type as you suggested and replied to #11216 (comment), do you have anymore comments for this PR?

@aokolnychyi
Copy link
Contributor

I had a conversation with @rdblue today about internal writers. Ryan should have a bit of time to help/guide.
I will check the current implementation today too.

@@ -205,6 +211,8 @@ public <T> T get(int pos, Class<T> javaClass) {
public <T> void set(int pos, T value) {
switch (pos) {
case 0:
Preconditions.checkArgument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a bit awkward to rely on Record for a nested field while the main object is simply StructLike.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can keep the member as Record instead of StructLike to avoid this. But since we have plan to use internal writers in future (which uses StructLike), we lose compatibility if we keep the member as Record instead of StructLike.

I don't think it is too awkward as Record implements StructLike.

}

@VisibleForTesting
static Iterator<PartitionStatsRecord> statsToRecords(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are doing a lot of logic here that wouldn't be needed with internal readers and writers. Let's at least estimate the amount of work to get the internal writer for Avro, to begin with. Any thoughts, @rdblue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's at least estimate the amount of work to get the internal writer for Avro

This PR needs internal writer for parquet and orc as well not just Avro.
Considering 1.8.0 is planned end of this month and we have holidays coming up next month, I don't want to miss the release train again (like 1.7.0)

We are waiting for partition stats from long time (almost an year) and this PR is implemented based on what is available in the current Iceberg. I too agree that having internal writers will be nice. But it can be added in future and PR is designed such that we can replace current writers with internal writers without losing compatibility.

So, I don't see a reason to block the development of this feature.
Merging this PR will complete milestone for partition stats.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And regarding effort for internal writers, I tried a POC last time just for parquet.
ajantha-bhat@c209bc9

Introduced GenericStructParquetWriter used by GenericStructFileWriterFactory.
As It uses BaseParquetWriter which expects LocalDate for date type instead of Int type and so on for other types. Should we use BaseParquetWriter with converter for internal writers or we should go and refactor the ParquetValueWriters and ColumnWriter was my doubt.

Also, If we use writers with converters, StructLike comparator will fail as it need int type for date but the final value is LocalDate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need internal writers here ? I understand it's an improvement, but I don't think it's a blocker for this PR. The effort is ongoing for a long time now, I would be more in favor of moving forward soon and plan internal writers improvement in a second step.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue, @aokolnychyi, @RussellSpitzer: Can we please conclude on this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jbonofre
Copy link
Member

@RussellSpitzer @aokolnychyi I'm reviewing the stale PRs, and this one is open for month. Do we have a way to move forward ? I can do a new review, but at the end of the day, it won't help for the merge (as only committers can merge PR).

@deniskuzZ
Copy link
Member

Thanks @ajantha-bhat for your work on partition stats support in Iceberg! That could be reused in Hive as a building block for apache/hive#5498

@danielcweeks danielcweeks requested a review from rdblue December 4, 2024 16:39
@jbonofre
Copy link
Member

@danielcweeks @RussellSpitzer @aokolnychyi would you have some time to take a look on this PR and my proposal (previous comment) ?

@regadas
Copy link
Contributor

regadas commented Dec 18, 2024

I just found this PR as I'm desperately looking for this functionality. Thanks @ajantha-bhat! Let's see if the review gets wrapped soon 🤞

* @param branch A branch information to select the required snapshot.
* @return {@link PartitionStatisticsFile} for the given branch.
*/
public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, String branch) {
Copy link
Member

@deniskuzZ deniskuzZ Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better not to have fat methods with multiple responsibilities? What if we introduce the write method that gets the stats iterator as an argument.
We might not need to execute the complete stats rebuild for all the registered partitions, but only for those that were changed in the current snapshot.
Snapshot summary already has a metric for the number of changed partitions, maybe we could extend it with the partition list and re-compute stats only for them. Then generate a new stats file based on the prev snapshot stats with updates to the changed partitions.

@deniskuzZ
Copy link
Member

deniskuzZ commented Jan 9, 2025

hi @ajantha-bhat, could you please check below:

it seems that Date, time, timestamp partition values are not properly serialized
see Type.TypeID

DATE(Integer.class),
TIME(Long.class),
TIMESTAMP(Long.class),

PartitionSpec.partitionToPath(PartitionStatsRecord.unwrap().partition()) thows an exception

Caused by: java.lang.IllegalStateException: Not an instance of java.lang.Integer: 1999-12-31
	at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
	at org.apache.iceberg.PartitionSpec.get(PartitionSpec.java:195)
	at org.apache.iceberg.PartitionSpec.partitionToPath(PartitionSpec.java:213)

I think, instead of Record(1999-12-31) it should be Record{10956}

full code snippet

Types.StructType partitionType = Partitioning.partitionType(table);
Schema schema = PartitionStatsHandler.schema(partitionType);

CloseableIterable<PartitionStatsRecord> partitionStatsRecords = PartitionStatsHandler.readPartitionStatsFile(
    schema, table.io().newInputFile(statsFile.path()));

try (Closeable toClose = partitionStatsRecords) {
  PartitionStats partitionStats = Iterables.tryFind(partitionStatsRecords, stats -> {
        PartitionSpec spec = table.specs().get(stats.unwrap().specId());
        return spec.partitionToPath(stats.unwrap().partition()).equals(partish.getPartition().getName());
      })
      .transform(PartitionStatsRecord::unwrap)
      .orNull();

  if (partitionStats != null) {
    Map<String, String> stats = ImmutableMap.of(
        TOTAL_DATA_FILES_PROP, String.valueOf(partitionStats.dataFileCount()),
        TOTAL_RECORDS_PROP, String.valueOf(partitionStats.dataRecordCount()),
        TOTAL_EQ_DELETES_PROP, String.valueOf(partitionStats.equalityDeleteRecordCount()),
        TOTAL_POS_DELETES_PROP, String.valueOf(partitionStats.positionDeleteRecordCount()),
        TOTAL_FILE_SIZE_PROP, String.valueOf(partitionStats.totalDataFileSizeInBytes())
    );
    return stats;
}

@ajantha-bhat
Copy link
Member Author

@deniskuzZ: Thanks for testing out. We are working on Internal parquet/Avro/orc readers and writes. partition stats will use them. So, we don't need to go through these converters. I will retest all the data types once I use internal writers for partition stats.

PRs:
#11919
#11904

@deniskuzZ
Copy link
Member

@deniskuzZ: Thanks for testing out. We are working on Internal parquet/Avro/orc readers and writes. partition stats will use them. So, we don't need to go through these converters. I will retest all the data types once I use internal writers for partition stats.

PRs: #11919 #11904

JFYI, once I removed IdentityPartitionConverters.convertConstant from the statsToRecords->convertPartitionValues issue with partitionToPath method is gone.
IDK, maybe you had some other idea of how to do the partition filtering on the returned stats, but I used the following

spec.partitionToPath(stats.unwrap().partition())
  .equals(partish.getPartition().getName())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants