From 3246521cedbc81e3e5e81957ebfc33e2b09cc164 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Thu, 9 Jan 2025 17:44:22 -0800 Subject: [PATCH] revert to previous approach --- .../org/apache/iceberg/parquet/Parquet.java | 74 +++++++++++++------ .../parquet/TestBloomRowGroupFilter.java | 2 +- 2 files changed, 51 insertions(+), 25 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index b3fda2554abc..310435209bac 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -55,6 +55,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -95,6 +96,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.PropertyUtil; @@ -115,8 +117,12 @@ import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Parquet { + private static final Logger LOG = LoggerFactory.getLogger(Parquet.class); + private Parquet() {} private static final Collection READ_PROPERTIES_TO_REMOVE = @@ -266,6 +272,43 @@ private WriteBuilder createContextFunc( return this; } + private void setBloomFilterConfig( + Context context, + MessageType parquetSchema, + BiConsumer withBloomFilterEnabled, + BiConsumer withBloomFilterFPP) { + + Map fieldIdToParquetPath = + parquetSchema.getColumns().stream() + .collect( + Collectors.toMap( + col -> col.getPrimitiveType().getId().intValue(), + col -> String.join(".", col.getPath()))); + + context + .columnBloomFilterEnabled() + .forEach( + (colPath, isEnabled) -> { + Types.NestedField fieldId = schema.findField(colPath); + if (fieldId == null) { + LOG.warn("Skipping bloom filter config for missing field: {}", colPath); + return; + } + + String parquetColumnPath = fieldIdToParquetPath.get(fieldId.fieldId()); + if (parquetColumnPath == null) { + LOG.warn("Skipping bloom filter config for missing field: {}", fieldId); + return; + } + + withBloomFilterEnabled.accept(parquetColumnPath, Boolean.valueOf(isEnabled)); + String fpp = context.columnBloomFilterFpp().get(colPath); + if (fpp != null) { + withBloomFilterFPP.accept(parquetColumnPath, Double.parseDouble(fpp)); + } + }); + } + public FileAppender build() throws IOException { Preconditions.checkNotNull(schema, "Schema is required"); Preconditions.checkNotNull(name, "Table name is required and cannot be null"); @@ -285,8 +328,6 @@ public FileAppender build() throws IOException { int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount(); int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount(); int bloomFilterMaxBytes = context.bloomFilterMaxBytes(); - Map columnBloomFilterFpp = context.columnBloomFilterFpp(); - Map columnBloomFilterEnabled = context.columnBloomFilterEnabled(); boolean dictionaryEnabled = context.dictionaryEnabled(); if (compressionLevel != null) { @@ -343,17 +384,8 @@ public FileAppender build() throws IOException { .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount) .withMaxBloomFilterBytes(bloomFilterMaxBytes); - for (Map.Entry entry : columnBloomFilterEnabled.entrySet()) { - String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey()); - String bloomEnabled = entry.getValue(); - propsBuilder.withBloomFilterEnabled(colPath, Boolean.parseBoolean(bloomEnabled)); - } - - for (Map.Entry entry : columnBloomFilterFpp.entrySet()) { - String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey()); - String fpp = entry.getValue(); - propsBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp)); - } + setBloomFilterConfig( + context, type, propsBuilder::withBloomFilterEnabled, propsBuilder::withBloomFilterFPP); ParquetProperties parquetProperties = propsBuilder.build(); @@ -386,17 +418,11 @@ public FileAppender build() throws IOException { .withDictionaryPageSize(dictionaryPageSize) .withEncryption(fileEncryptionProperties); - for (Map.Entry entry : columnBloomFilterEnabled.entrySet()) { - String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey()); - String bloomEnabled = entry.getValue(); - parquetWriteBuilder.withBloomFilterEnabled(colPath, Boolean.parseBoolean(bloomEnabled)); - } - - for (Map.Entry entry : columnBloomFilterFpp.entrySet()) { - String colPath = AvroSchemaUtil.makeCompatibleName(entry.getKey()); - String fpp = entry.getValue(); - parquetWriteBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp)); - } + setBloomFilterConfig( + context, + type, + parquetWriteBuilder::withBloomFilterEnabled, + parquetWriteBuilder::withBloomFilterFPP); return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig); } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java index 2812d915176c..bfa511c91279 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java @@ -195,7 +195,7 @@ public void createInputFile() throws IOException { // build struct field schema org.apache.avro.Schema structSchema = AvroSchemaUtil.convert(UNDERSCORE_STRUCT_FIELD_TYPE); - String compatibleFieldName = AvroSchemaUtil.makeCompatibleName("_incompatible-name"); + String compatibleFieldName = "_incompatible_x2Dname"; OutputFile outFile = Files.localOutput(temp); try (FileAppender appender =