From bb3a4c5677b7798ec8ba2957fd9d2cf1225801b0 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Mon, 6 Jan 2025 16:26:14 +0530 Subject: [PATCH 1/4] Avro: Add internal writer --- .../iceberg/avro/BaseAvroSchemaVisitor.java | 117 +++++++++++++++ .../iceberg/avro/GenericAvroWriter.java | 89 +----------- .../apache/iceberg/avro/InternalReader.java | 1 - .../apache/iceberg/avro/InternalWriter.java | 74 ++++++++++ .../org/apache/iceberg/avro/ValueWriters.java | 17 +++ .../iceberg/avro/TestInternalWriter.java | 133 ++++++++++++++++++ 6 files changed, 346 insertions(+), 85 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/avro/BaseAvroSchemaVisitor.java create mode 100644 core/src/main/java/org/apache/iceberg/avro/InternalWriter.java create mode 100644 core/src/test/java/org/apache/iceberg/avro/TestInternalWriter.java diff --git a/core/src/main/java/org/apache/iceberg/avro/BaseAvroSchemaVisitor.java b/core/src/main/java/org/apache/iceberg/avro/BaseAvroSchemaVisitor.java new file mode 100644 index 000000000000..387df863f7c1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/BaseAvroSchemaVisitor.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.util.List; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +abstract class BaseAvroSchemaVisitor extends AvroSchemaVisitor> { + + protected abstract ValueWriter createRecordWriter(List> fields); + + protected abstract ValueWriter fixedWriter(int size); + + @Override + public ValueWriter record(Schema record, List names, List> fields) { + return createRecordWriter(fields); + } + + @Override + public ValueWriter union(Schema union, List> options) { + Preconditions.checkArgument( + options.contains(ValueWriters.nulls()), + "Cannot create writer for non-option union: %s", + union); + Preconditions.checkArgument( + options.size() == 2, "Cannot create writer for non-option union: %s", union); + if (union.getTypes().get(0).getType() == Schema.Type.NULL) { + return ValueWriters.option(0, options.get(1)); + } else { + return ValueWriters.option(1, options.get(0)); + } + } + + @Override + public ValueWriter array(Schema array, ValueWriter elementWriter) { + if (array.getLogicalType() instanceof LogicalMap) { + ValueWriters.StructWriter keyValueWriter = (ValueWriters.StructWriter) elementWriter; + return ValueWriters.arrayMap(keyValueWriter.writer(0), keyValueWriter.writer(1)); + } + + return ValueWriters.array(elementWriter); + } + + @Override + public ValueWriter map(Schema map, ValueWriter valueWriter) { + return ValueWriters.map(ValueWriters.strings(), valueWriter); + } + + @Override + public ValueWriter primitive(Schema primitive) { + LogicalType logicalType = primitive.getLogicalType(); + if (logicalType != null) { + switch (logicalType.getName()) { + case "date": + return ValueWriters.ints(); + + case "time-micros": + return ValueWriters.longs(); + + case "timestamp-micros": + return ValueWriters.longs(); + + case "decimal": + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; + return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); + + case "uuid": + return ValueWriters.uuids(); + + default: + throw new IllegalArgumentException("Unsupported logical type: " + logicalType); + } + } + + switch (primitive.getType()) { + case NULL: + return ValueWriters.nulls(); + case BOOLEAN: + return ValueWriters.booleans(); + case INT: + return ValueWriters.ints(); + case LONG: + return ValueWriters.longs(); + case FLOAT: + return ValueWriters.floats(); + case DOUBLE: + return ValueWriters.doubles(); + case STRING: + return ValueWriters.strings(); + case FIXED: + return fixedWriter(primitive.getFixedSize()); + case BYTES: + return ValueWriters.byteBuffers(); + default: + throw new IllegalArgumentException("Unsupported type: " + primitive); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java index 421bfc9dc462..ac3b9a275f04 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java @@ -21,12 +21,9 @@ import java.io.IOException; import java.util.List; import java.util.stream.Stream; -import org.apache.avro.LogicalType; -import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.io.Encoder; import org.apache.iceberg.FieldMetrics; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class GenericAvroWriter implements MetricsAwareDatumWriter { private ValueWriter writer = null; @@ -42,7 +39,7 @@ public static GenericAvroWriter create(Schema schema) { @Override @SuppressWarnings("unchecked") public void setSchema(Schema schema) { - this.writer = (ValueWriter) AvroSchemaVisitor.visit(schema, new WriteBuilder()); + this.writer = (ValueWriter) AvroSchemaVisitor.visit(schema, new GenericAvroSchemaVisitor()); } @Override @@ -55,92 +52,16 @@ public Stream metrics() { return writer.metrics(); } - private static class WriteBuilder extends AvroSchemaVisitor> { - private WriteBuilder() {} + private static class GenericAvroSchemaVisitor extends BaseAvroSchemaVisitor { @Override - public ValueWriter record(Schema record, List names, List> fields) { + protected ValueWriter createRecordWriter(List> fields) { return ValueWriters.record(fields); } @Override - public ValueWriter union(Schema union, List> options) { - Preconditions.checkArgument( - options.contains(ValueWriters.nulls()), - "Cannot create writer for non-option union: %s", - union); - Preconditions.checkArgument( - options.size() == 2, "Cannot create writer for non-option union: %s", union); - if (union.getTypes().get(0).getType() == Schema.Type.NULL) { - return ValueWriters.option(0, options.get(1)); - } else { - return ValueWriters.option(1, options.get(0)); - } - } - - @Override - public ValueWriter array(Schema array, ValueWriter elementWriter) { - if (array.getLogicalType() instanceof LogicalMap) { - ValueWriters.StructWriter keyValueWriter = (ValueWriters.StructWriter) elementWriter; - return ValueWriters.arrayMap(keyValueWriter.writer(0), keyValueWriter.writer(1)); - } - - return ValueWriters.array(elementWriter); - } - - @Override - public ValueWriter map(Schema map, ValueWriter valueWriter) { - return ValueWriters.map(ValueWriters.strings(), valueWriter); - } - - @Override - public ValueWriter primitive(Schema primitive) { - LogicalType logicalType = primitive.getLogicalType(); - if (logicalType != null) { - switch (logicalType.getName()) { - case "date": - return ValueWriters.ints(); - - case "time-micros": - return ValueWriters.longs(); - - case "timestamp-micros": - return ValueWriters.longs(); - - case "decimal": - LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; - return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); - - case "uuid": - return ValueWriters.uuids(); - - default: - throw new IllegalArgumentException("Unsupported logical type: " + logicalType); - } - } - - switch (primitive.getType()) { - case NULL: - return ValueWriters.nulls(); - case BOOLEAN: - return ValueWriters.booleans(); - case INT: - return ValueWriters.ints(); - case LONG: - return ValueWriters.longs(); - case FLOAT: - return ValueWriters.floats(); - case DOUBLE: - return ValueWriters.doubles(); - case STRING: - return ValueWriters.strings(); - case FIXED: - return ValueWriters.genericFixed(primitive.getFixedSize()); - case BYTES: - return ValueWriters.byteBuffers(); - default: - throw new IllegalArgumentException("Unsupported type: " + primitive); - } + protected ValueWriter fixedWriter(int size) { + return ValueWriters.genericFixed(size); } } } diff --git a/core/src/main/java/org/apache/iceberg/avro/InternalReader.java b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java index ca83ce2ba7cd..0dab3646d4d4 100644 --- a/core/src/main/java/org/apache/iceberg/avro/InternalReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java @@ -205,7 +205,6 @@ public ValueReader primitive(Pair partner, Schema primitive) { case STRING: return ValueReaders.strings(); case FIXED: - return ValueReaders.fixed(primitive); case BYTES: return ValueReaders.byteBuffers(); case ENUM: diff --git a/core/src/main/java/org/apache/iceberg/avro/InternalWriter.java b/core/src/main/java/org/apache/iceberg/avro/InternalWriter.java new file mode 100644 index 000000000000..56d3f4ac5cc0 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/InternalWriter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Stream; +import org.apache.avro.Schema; +import org.apache.avro.io.Encoder; +import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.types.Type; + +/** + * A Writer that consumes Iceberg's internal in-memory object model. + * + *

Iceberg's internal in-memory object model produces the types defined in {@link + * Type.TypeID#javaClass()}. + */ +public class InternalWriter implements MetricsAwareDatumWriter { + private ValueWriter writer = null; + + public static InternalWriter create(Schema schema) { + return new InternalWriter<>(schema); + } + + InternalWriter(Schema schema) { + setSchema(schema); + } + + @Override + @SuppressWarnings("unchecked") + public void setSchema(Schema schema) { + this.writer = (ValueWriter) AvroSchemaVisitor.visit(schema, new GenericAvroSchemaVisitor()); + } + + @Override + public void write(T datum, Encoder out) throws IOException { + writer.write(datum, out); + } + + @Override + public Stream metrics() { + return writer.metrics(); + } + + private static class GenericAvroSchemaVisitor extends BaseAvroSchemaVisitor { + + @Override + protected ValueWriter createRecordWriter(List> fields) { + return ValueWriters.struct(fields); + } + + @Override + protected ValueWriter fixedWriter(int size) { + return ValueWriters.byteBuffers(); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java index 3844ede1c16a..b799cc760476 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java @@ -32,6 +32,7 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.Encoder; import org.apache.avro.util.Utf8; +import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DecimalUtil; @@ -126,6 +127,10 @@ public static ValueWriter record(List> writers) { return new RecordWriter(writers); } + public static ValueWriter struct(List> writers) { + return new StructLikeWriter(writers); + } + private static class NullWriter implements ValueWriter { private static final NullWriter INSTANCE = new NullWriter(); @@ -484,4 +489,16 @@ protected Object get(IndexedRecord struct, int pos) { return struct.get(pos); } } + + private static class StructLikeWriter extends StructWriter { + @SuppressWarnings("unchecked") + private StructLikeWriter(List> writers) { + super(writers); + } + + @Override + protected Object get(StructLike struct, int pos) { + return struct.get(pos, Object.class); + } + } } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestInternalWriter.java b/core/src/test/java/org/apache/iceberg/avro/TestInternalWriter.java new file mode 100644 index 000000000000..b1d22b478765 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/TestInternalWriter.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.List; +import java.util.UUID; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestInternalWriter { + + @TempDir Path temp; + + @Test + public void testDataWriter() throws IOException { + Schema schema = + new Schema( + required(100, "b", Types.BooleanType.get()), + optional(101, "i", Types.IntegerType.get()), + required(102, "l", Types.LongType.get()), + optional(103, "f", Types.FloatType.get()), + required(104, "d", Types.DoubleType.get()), + optional(105, "date", Types.DateType.get()), + required(106, "time", Types.TimeType.get()), + required(107, "ts", Types.TimestampType.withoutZone()), + required(108, "ts_tz", Types.TimestampType.withZone()), + required(109, "s", Types.StringType.get()), + required(110, "uuid", Types.UUIDType.get()), + required(111, "fixed", Types.FixedType.ofLength(7)), + optional(112, "bytes", Types.BinaryType.get()), + required(113, "dec_38_10", Types.DecimalType.of(38, 10))); + + // Consuming the data as per Type.java + GenericRecord record = GenericRecord.create(schema); + record.set(0, true); + record.set(1, 42); + record.set(2, 42L); + record.set(3, 3.14f); + record.set(4, 3.141592653589793); + record.set(5, Literal.of("2022-01-01").to(Types.DateType.get()).value()); + record.set(6, Literal.of("10:10:10").to(Types.TimeType.get()).value()); + record.set( + 7, Literal.of("2017-12-01T10:12:55.038194").to(Types.TimestampType.withoutZone()).value()); + record.set( + 8, + Literal.of("2017-11-29T11:30:07.123456+01:00").to(Types.TimestampType.withZone()).value()); + record.set(9, "string"); + record.set(10, UUID.randomUUID()); + record.set(11, ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6})); + record.set(12, ByteBuffer.wrap(new byte[] {1, 2, 3})); + record.set( + 13, Literal.of("12345678901234567890.1234567890").to(Types.DecimalType.of(38, 10)).value()); + + StructProjection structProjection = StructProjection.create(schema, schema); + StructProjection row = structProjection.wrap(record); + + OutputFile file = Files.localOutput(temp.toFile()); + + DataWriter dataWriter = + Avro.writeData(file) + .schema(schema) + .createWriterFunc(InternalWriter::create) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build(); + + try { + dataWriter.write(row); + } finally { + dataWriter.close(); + } + + DataFile dataFile = dataWriter.toDataFile(); + + assertThat(dataFile.format()).as("Format should be Avro").isEqualTo(FileFormat.AVRO); + assertThat(dataFile.content()).as("Should be data file").isEqualTo(FileContent.DATA); + assertThat(dataFile.partition().size()).as("Partition should be empty").isEqualTo(0); + assertThat(dataFile.keyMetadata()).as("Key metadata should be null").isNull(); + + List writtenRecords; + try (AvroIterable reader = + Avro.read(file.toInputFile()) + .project(schema) + .createResolvingReader(InternalReader::create) + .build()) { + writtenRecords = Lists.newArrayList(reader); + } + assertThat(writtenRecords).hasSize(1); + Comparator structLikeComparator = Comparators.forType(schema.asStruct()); + assertThat(structLikeComparator.compare(writtenRecords.get(0), row)) + .as("Written records should match") + .isZero(); + } +} From 75864f997c8d7633b4b8360049a0a1d7cd0fbdfe Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Wed, 8 Jan 2025 11:18:46 +0530 Subject: [PATCH 2/4] Address comments --- ...hemaVisitor.java => BaseWriteBuilder.java} | 13 +- .../iceberg/avro/GenericAvroWriter.java | 8 +- .../apache/iceberg/avro/InternalWriter.java | 8 +- .../org/apache/iceberg/avro/ValueWriters.java | 23 ++- .../apache/iceberg/avro/AvroTestHelpers.java | 28 ++- .../apache/iceberg/avro/RandomAvroData.java | 163 +++++++++++++----- .../apache/iceberg/avro/TestInternalAvro.java | 63 +++++++ .../iceberg/avro/TestInternalWriter.java | 133 -------------- 8 files changed, 246 insertions(+), 193 deletions(-) rename core/src/main/java/org/apache/iceberg/avro/{BaseAvroSchemaVisitor.java => BaseWriteBuilder.java} (91%) create mode 100644 core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java delete mode 100644 core/src/test/java/org/apache/iceberg/avro/TestInternalWriter.java diff --git a/core/src/main/java/org/apache/iceberg/avro/BaseAvroSchemaVisitor.java b/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java similarity index 91% rename from core/src/main/java/org/apache/iceberg/avro/BaseAvroSchemaVisitor.java rename to core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java index 387df863f7c1..f8a2bc604656 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BaseAvroSchemaVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java @@ -24,11 +24,11 @@ import org.apache.avro.Schema; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -abstract class BaseAvroSchemaVisitor extends AvroSchemaVisitor> { +abstract class BaseWriteBuilder extends AvroSchemaVisitor> { protected abstract ValueWriter createRecordWriter(List> fields); - protected abstract ValueWriter fixedWriter(int size); + protected abstract ValueWriter fixedWriter(int length); @Override public ValueWriter record(Schema record, List names, List> fields) { @@ -37,16 +37,15 @@ public ValueWriter record(Schema record, List names, List union(Schema union, List> options) { - Preconditions.checkArgument( - options.contains(ValueWriters.nulls()), - "Cannot create writer for non-option union: %s", - union); Preconditions.checkArgument( options.size() == 2, "Cannot create writer for non-option union: %s", union); if (union.getTypes().get(0).getType() == Schema.Type.NULL) { return ValueWriters.option(0, options.get(1)); - } else { + } else if (union.getTypes().get(1).getType() == Schema.Type.NULL) { return ValueWriters.option(1, options.get(0)); + } else { + throw new IllegalArgumentException( + String.format("Cannot create writer for non-option union: %s", union)); } } diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java index ac3b9a275f04..316dd94d2f68 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java @@ -39,7 +39,7 @@ public static GenericAvroWriter create(Schema schema) { @Override @SuppressWarnings("unchecked") public void setSchema(Schema schema) { - this.writer = (ValueWriter) AvroSchemaVisitor.visit(schema, new GenericAvroSchemaVisitor()); + this.writer = (ValueWriter) AvroSchemaVisitor.visit(schema, new WriteBuilder()); } @Override @@ -52,7 +52,7 @@ public Stream metrics() { return writer.metrics(); } - private static class GenericAvroSchemaVisitor extends BaseAvroSchemaVisitor { + private static class WriteBuilder extends BaseWriteBuilder { @Override protected ValueWriter createRecordWriter(List> fields) { @@ -60,8 +60,8 @@ protected ValueWriter createRecordWriter(List> fields) { } @Override - protected ValueWriter fixedWriter(int size) { - return ValueWriters.genericFixed(size); + protected ValueWriter fixedWriter(int length) { + return ValueWriters.genericFixed(length); } } } diff --git a/core/src/main/java/org/apache/iceberg/avro/InternalWriter.java b/core/src/main/java/org/apache/iceberg/avro/InternalWriter.java index 56d3f4ac5cc0..8f20aeeb6bfd 100644 --- a/core/src/main/java/org/apache/iceberg/avro/InternalWriter.java +++ b/core/src/main/java/org/apache/iceberg/avro/InternalWriter.java @@ -46,7 +46,7 @@ public static InternalWriter create(Schema schema) { @Override @SuppressWarnings("unchecked") public void setSchema(Schema schema) { - this.writer = (ValueWriter) AvroSchemaVisitor.visit(schema, new GenericAvroSchemaVisitor()); + this.writer = (ValueWriter) AvroSchemaVisitor.visit(schema, new WriteBuilder()); } @Override @@ -59,7 +59,7 @@ public Stream metrics() { return writer.metrics(); } - private static class GenericAvroSchemaVisitor extends BaseAvroSchemaVisitor { + private static class WriteBuilder extends BaseWriteBuilder { @Override protected ValueWriter createRecordWriter(List> fields) { @@ -67,8 +67,8 @@ protected ValueWriter createRecordWriter(List> fields) { } @Override - protected ValueWriter fixedWriter(int size) { - return ValueWriters.byteBuffers(); + protected ValueWriter fixedWriter(int length) { + return ValueWriters.fixedBuffers(length); } } } diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java index b799cc760476..afc0a37a2838 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java @@ -93,6 +93,10 @@ public static ValueWriter genericFixed(int length) { return new GenericFixedWriter(length); } + public static ValueWriter fixedBuffers(int length) { + return new FixedByteBufferWriter(length); + } + public static ValueWriter bytes() { return BytesWriter.INSTANCE; } @@ -332,6 +336,24 @@ public void write(ByteBuffer bytes, Encoder encoder) throws IOException { } } + private static class FixedByteBufferWriter implements ValueWriter { + private final int length; + + private FixedByteBufferWriter(int length) { + this.length = length; + } + + @Override + public void write(ByteBuffer bytes, Encoder encoder) throws IOException { + Preconditions.checkArgument( + bytes.remaining() == length, + "Cannot write byte buffer of length %s as fixed[%s]", + bytes.remaining(), + length); + encoder.writeBytes(bytes); + } + } + private static class DecimalWriter implements ValueWriter { private final int precision; private final int scale; @@ -491,7 +513,6 @@ protected Object get(IndexedRecord struct, int pos) { } private static class StructLikeWriter extends StructWriter { - @SuppressWarnings("unchecked") private StructLikeWriter(List> writers) { super(writers); } diff --git a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java index 03108376eb4b..f64623f452f9 100644 --- a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java @@ -27,6 +27,7 @@ import org.apache.avro.JsonProperties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData.Record; +import org.apache.iceberg.StructLike; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -78,6 +79,18 @@ static void assertEquals(Types.StructType struct, Record expected, Record actual } } + static void assertEquals(Types.StructType struct, StructLike expected, StructLike actual) { + List fields = struct.fields(); + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = fields.get(i).type(); + + Object expectedValue = expected.get(i, Object.class); + Object actualValue = actual.get(i, Object.class); + + assertEquals(fieldType, expectedValue, actualValue); + } + } + static void assertEquals(Types.ListType list, List expected, List actual) { Type elementType = list.elementType(); @@ -126,9 +139,18 @@ private static void assertEquals(Type type, Object expected, Object actual) { assertThat(actual).as("Primitive value should be equal to expected").isEqualTo(expected); break; case STRUCT: - assertThat(expected).as("Expected should be a Record").isInstanceOf(Record.class); - assertThat(actual).as("Actual should be a Record").isInstanceOf(Record.class); - assertEquals(type.asStructType(), (Record) expected, (Record) actual); + assertThat(expected) + .as("Expected should be a Record or StructLike") + .isInstanceOfAny(Record.class, StructLike.class); + + if (expected instanceof Record) { + assertThat(actual).as("Actual should be a Record").isInstanceOf(Record.class); + assertEquals(type.asStructType(), (Record) expected, (Record) actual); + } else { + assertThat(actual).as("Actual should be a GenericRecord").isInstanceOf(StructLike.class); + assertEquals(type.asStructType(), (StructLike) expected, (StructLike) actual); + } + break; case LIST: assertThat(expected).as("Expected should be a List").isInstanceOf(List.class); diff --git a/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java b/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java index 473a468f7e94..212f178a306a 100644 --- a/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java +++ b/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java @@ -29,6 +29,8 @@ import org.apache.avro.generic.GenericData.Record; import org.apache.avro.util.Utf8; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -51,6 +53,66 @@ public static List generate(Schema schema, int numRecords, long seed) { return records; } + public static List generateStructLike(Schema schema, int numRecords, long seed) { + RandomInternalDataGenerator generator = new RandomInternalDataGenerator(seed); + List records = Lists.newArrayListWithExpectedSize(numRecords); + for (int i = 0; i < numRecords; i += 1) { + records.add((StructLike) TypeUtil.visit(schema, generator)); + } + + return records; + } + + private static List generateList( + Random random, Types.ListType list, Supplier elementResult) { + int numElements = random.nextInt(20); + + List result = Lists.newArrayListWithExpectedSize(numElements); + for (int i = 0; i < numElements; i += 1) { + // return null 5% of the time when the value is optional + if (list.isElementOptional() && random.nextInt(20) == 1) { + result.add(null); + } else { + result.add(elementResult.get()); + } + } + + return result; + } + + private static Map generateMap( + Random random, Types.MapType map, Supplier keyResult, Supplier valueResult) { + int numEntries = random.nextInt(20); + + Map result = Maps.newLinkedHashMap(); + Supplier keyFunc; + if (map.keyType() == Types.StringType.get()) { + keyFunc = () -> keyResult.get().toString(); + } else { + keyFunc = keyResult; + } + + Set keySet = Sets.newHashSet(); + for (int i = 0; i < numEntries; i += 1) { + Object key = keyFunc.get(); + // ensure no collisions + while (keySet.contains(key)) { + key = keyFunc.get(); + } + + keySet.add(key); + + // return null 5% of the time when the value is optional + if (map.isValueOptional() && random.nextInt(20) == 1) { + result.put(key, null); + } else { + result.put(key, valueResult.get()); + } + } + + return result; + } + private static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor { private final Map typeToSchema; private final Random random; @@ -88,64 +150,83 @@ public Object field(Types.NestedField field, Supplier fieldResult) { @Override public Object list(Types.ListType list, Supplier elementResult) { - int numElements = random.nextInt(20); - - List result = Lists.newArrayListWithExpectedSize(numElements); - for (int i = 0; i < numElements; i += 1) { - // return null 5% of the time when the value is optional - if (list.isElementOptional() && random.nextInt(20) == 1) { - result.add(null); - } else { - result.add(elementResult.get()); - } + return generateList(random, list, elementResult); + } + + @Override + public Object map(Types.MapType map, Supplier keyResult, Supplier valueResult) { + return generateMap(random, map, keyResult, valueResult); + } + + @Override + public Object primitive(Type.PrimitiveType primitive) { + Object result = RandomUtil.generatePrimitive(primitive, random); + // For the primitives that Avro needs a different type than Spark, fix + // them here. + switch (primitive.typeId()) { + case STRING: + return new Utf8((String) result); + case FIXED: + return new GenericData.Fixed(typeToSchema.get(primitive), (byte[]) result); + case BINARY: + return ByteBuffer.wrap((byte[]) result); + case UUID: + return UUID.nameUUIDFromBytes((byte[]) result); + default: + return result; } + } + } - return result; + private static class RandomInternalDataGenerator + extends TypeUtil.CustomOrderSchemaVisitor { + private final Random random; + + private RandomInternalDataGenerator(long seed) { + this.random = new Random(seed); } @Override - public Object map(Types.MapType map, Supplier keyResult, Supplier valueResult) { - int numEntries = random.nextInt(20); + public StructLike schema(Schema schema, Supplier structResult) { + return (StructLike) structResult.get(); + } - Map result = Maps.newLinkedHashMap(); - Supplier keyFunc; - if (map.keyType() == Types.StringType.get()) { - keyFunc = () -> keyResult.get().toString(); - } else { - keyFunc = keyResult; + @Override + public StructLike struct(Types.StructType struct, Iterable fieldResults) { + StructLike rec = GenericRecord.create(struct); + List values = Lists.newArrayList(fieldResults); + for (int i = 0; i < values.size(); i += 1) { + rec.set(i, values.get(i)); } - Set keySet = Sets.newHashSet(); - for (int i = 0; i < numEntries; i += 1) { - Object key = keyFunc.get(); - // ensure no collisions - while (keySet.contains(key)) { - key = keyFunc.get(); - } - - keySet.add(key); - - // return null 5% of the time when the value is optional - if (map.isValueOptional() && random.nextInt(20) == 1) { - result.put(key, null); - } else { - result.put(key, valueResult.get()); - } + return rec; + } + + @Override + public Object field(Types.NestedField field, Supplier fieldResult) { + // return null 5% of the time when the value is optional + if (field.isOptional() && random.nextInt(20) == 1) { + return null; } + return fieldResult.get(); + } + + @Override + public Object list(Types.ListType list, Supplier elementResult) { + return generateList(random, list, elementResult); + } - return result; + @Override + public Object map(Types.MapType map, Supplier keyResult, Supplier valueResult) { + return generateMap(random, map, keyResult, valueResult); } @Override public Object primitive(Type.PrimitiveType primitive) { Object result = RandomUtil.generatePrimitive(primitive, random); - // For the primitives that Avro needs a different type than Spark, fix - // them here. + switch (primitive.typeId()) { - case STRING: - return new Utf8((String) result); case FIXED: - return new GenericData.Fixed(typeToSchema.get(primitive), (byte[]) result); case BINARY: return ByteBuffer.wrap((byte[]) result); case UUID: diff --git a/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java b/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java new file mode 100644 index 000000000000..d542bf6fc03c --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class TestInternalAvro extends AvroDataTest { + @Override + protected void writeAndValidate(Schema schema) throws IOException { + List expected = RandomAvroData.generateStructLike(schema, 100, 0L); + + OutputFile outputFile = new InMemoryOutputFile(); + + try (DataWriter dataWriter = + Avro.writeData(outputFile) + .schema(schema) + .createWriterFunc(InternalWriter::create) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build()) { + for (StructLike rec : expected) { + dataWriter.write(rec); + } + } + + List rows; + try (AvroIterable reader = + Avro.read(outputFile.toInputFile()) + .project(schema) + .createResolvingReader(InternalReader::create) + .build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + AvroTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i)); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/avro/TestInternalWriter.java b/core/src/test/java/org/apache/iceberg/avro/TestInternalWriter.java deleted file mode 100644 index b1d22b478765..000000000000 --- a/core/src/test/java/org/apache/iceberg/avro/TestInternalWriter.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.avro; - -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Path; -import java.util.Comparator; -import java.util.List; -import java.util.UUID; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileContent; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Files; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.expressions.Literal; -import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Comparators; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.StructProjection; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -public class TestInternalWriter { - - @TempDir Path temp; - - @Test - public void testDataWriter() throws IOException { - Schema schema = - new Schema( - required(100, "b", Types.BooleanType.get()), - optional(101, "i", Types.IntegerType.get()), - required(102, "l", Types.LongType.get()), - optional(103, "f", Types.FloatType.get()), - required(104, "d", Types.DoubleType.get()), - optional(105, "date", Types.DateType.get()), - required(106, "time", Types.TimeType.get()), - required(107, "ts", Types.TimestampType.withoutZone()), - required(108, "ts_tz", Types.TimestampType.withZone()), - required(109, "s", Types.StringType.get()), - required(110, "uuid", Types.UUIDType.get()), - required(111, "fixed", Types.FixedType.ofLength(7)), - optional(112, "bytes", Types.BinaryType.get()), - required(113, "dec_38_10", Types.DecimalType.of(38, 10))); - - // Consuming the data as per Type.java - GenericRecord record = GenericRecord.create(schema); - record.set(0, true); - record.set(1, 42); - record.set(2, 42L); - record.set(3, 3.14f); - record.set(4, 3.141592653589793); - record.set(5, Literal.of("2022-01-01").to(Types.DateType.get()).value()); - record.set(6, Literal.of("10:10:10").to(Types.TimeType.get()).value()); - record.set( - 7, Literal.of("2017-12-01T10:12:55.038194").to(Types.TimestampType.withoutZone()).value()); - record.set( - 8, - Literal.of("2017-11-29T11:30:07.123456+01:00").to(Types.TimestampType.withZone()).value()); - record.set(9, "string"); - record.set(10, UUID.randomUUID()); - record.set(11, ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6})); - record.set(12, ByteBuffer.wrap(new byte[] {1, 2, 3})); - record.set( - 13, Literal.of("12345678901234567890.1234567890").to(Types.DecimalType.of(38, 10)).value()); - - StructProjection structProjection = StructProjection.create(schema, schema); - StructProjection row = structProjection.wrap(record); - - OutputFile file = Files.localOutput(temp.toFile()); - - DataWriter dataWriter = - Avro.writeData(file) - .schema(schema) - .createWriterFunc(InternalWriter::create) - .overwrite() - .withSpec(PartitionSpec.unpartitioned()) - .build(); - - try { - dataWriter.write(row); - } finally { - dataWriter.close(); - } - - DataFile dataFile = dataWriter.toDataFile(); - - assertThat(dataFile.format()).as("Format should be Avro").isEqualTo(FileFormat.AVRO); - assertThat(dataFile.content()).as("Should be data file").isEqualTo(FileContent.DATA); - assertThat(dataFile.partition().size()).as("Partition should be empty").isEqualTo(0); - assertThat(dataFile.keyMetadata()).as("Key metadata should be null").isNull(); - - List writtenRecords; - try (AvroIterable reader = - Avro.read(file.toInputFile()) - .project(schema) - .createResolvingReader(InternalReader::create) - .build()) { - writtenRecords = Lists.newArrayList(reader); - } - assertThat(writtenRecords).hasSize(1); - Comparator structLikeComparator = Comparators.forType(schema.asStruct()); - assertThat(structLikeComparator.compare(writtenRecords.get(0), row)) - .as("Written records should match") - .isZero(); - } -} From a1a1c40a1d3323e662aeebd3a60274f46dd9f099 Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Thu, 9 Jan 2025 05:39:57 +0530 Subject: [PATCH 3/4] Address test comments --- .../org/apache/iceberg/util/RandomUtil.java | 57 ++++++++ .../apache/iceberg/InternalTestHelpers.java | 110 +++++++++++++++ .../apache/iceberg/RandomInternalData.java | 104 ++++++++++++++ .../apache/iceberg/avro/AvroTestHelpers.java | 28 +--- .../apache/iceberg/avro/RandomAvroData.java | 128 +----------------- .../apache/iceberg/avro/TestInternalAvro.java | 7 +- 6 files changed, 281 insertions(+), 153 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/InternalTestHelpers.java create mode 100644 core/src/test/java/org/apache/iceberg/RandomInternalData.java diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java index 42dd6825a4d5..7414a457c520 100644 --- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java +++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java @@ -21,7 +21,14 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -228,4 +235,54 @@ private static BigInteger randomUnscaled(int precision, Random random) { return new BigInteger(sb.toString()); } + + public static List generateList( + Random random, Types.ListType list, Supplier elementResult) { + int numElements = random.nextInt(20); + + List result = Lists.newArrayListWithExpectedSize(numElements); + for (int i = 0; i < numElements; i += 1) { + // return null 5% of the time when the value is optional + if (list.isElementOptional() && random.nextInt(20) == 1) { + result.add(null); + } else { + result.add(elementResult.get()); + } + } + + return result; + } + + public static Map generateMap( + Random random, Types.MapType map, Supplier keyResult, Supplier valueResult) { + int numEntries = random.nextInt(20); + + Map result = Maps.newLinkedHashMap(); + Supplier keyFunc; + if (map.keyType() == Types.StringType.get()) { + keyFunc = () -> keyResult.get().toString(); + } else { + keyFunc = keyResult; + } + + Set keySet = Sets.newHashSet(); + for (int i = 0; i < numEntries; i += 1) { + Object key = keyFunc.get(); + // ensure no collisions + while (keySet.contains(key)) { + key = keyFunc.get(); + } + + keySet.add(key); + + // return null 5% of the time when the value is optional + if (map.isValueOptional() && random.nextInt(20) == 1) { + result.put(key, null); + } else { + result.put(key, valueResult.get()); + } + } + + return result; + } } diff --git a/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java new file mode 100644 index 000000000000..7b7400b69ab4 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +public class InternalTestHelpers { + + private InternalTestHelpers() {} + + public static void assertEquals(Types.StructType struct, StructLike expected, StructLike actual) { + List fields = struct.fields(); + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = fields.get(i).type(); + + Object expectedValue = expected.get(i, fieldType.typeId().javaClass()); + Object actualValue = actual.get(i, fieldType.typeId().javaClass()); + + assertEquals(fieldType, expectedValue, actualValue); + } + } + + public static void assertEquals(Types.ListType list, List expected, List actual) { + Type elementType = list.elementType(); + + assertThat(actual).as("List size should match").hasSameSizeAs(expected); + + for (int i = 0; i < expected.size(); i += 1) { + Object expectedValue = expected.get(i); + Object actualValue = actual.get(i); + + assertEquals(elementType, expectedValue, actualValue); + } + } + + public static void assertEquals(Types.MapType map, Map expected, Map actual) { + Type valueType = map.valueType(); + + assertThat(actual).as("Map keys should match").hasSameSizeAs(expected); + + for (Object expectedKey : expected.keySet()) { + Object expectedValue = expected.get(expectedKey); + Object actualValue = actual.get(expectedKey); + + assertEquals(valueType, expectedValue, actualValue); + } + } + + private static void assertEquals(Type type, Object expected, Object actual) { + if (expected == null && actual == null) { + return; + } + + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case LONG: + case FLOAT: + case DOUBLE: + case STRING: + case DATE: + case TIME: + case TIMESTAMP: + case UUID: + case FIXED: + case BINARY: + case DECIMAL: + assertThat(actual).as("Primitive value should be equal to expected").isEqualTo(expected); + break; + case STRUCT: + assertThat(expected).as("Expected should be a StructLike").isInstanceOf(StructLike.class); + assertThat(actual).as("Actual should be a StructLike").isInstanceOf(StructLike.class); + assertEquals(type.asStructType(), (StructLike) expected, (StructLike) actual); + break; + case LIST: + assertThat(expected).as("Expected should be a List").isInstanceOf(List.class); + assertThat(actual).as("Actual should be a List").isInstanceOf(List.class); + assertEquals(type.asListType(), (List) expected, (List) actual); + break; + case MAP: + assertThat(expected).as("Expected should be a Map").isInstanceOf(Map.class); + assertThat(actual).as("Actual should be a Map").isInstanceOf(Map.class); + assertEquals(type.asMapType(), (Map) expected, (Map) actual); + break; + default: + throw new IllegalArgumentException("Not a supported type: " + type); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/RandomInternalData.java b/core/src/test/java/org/apache/iceberg/RandomInternalData.java new file mode 100644 index 000000000000..a7de8e4c8f01 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/RandomInternalData.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.function.Supplier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.RandomUtil; + +public class RandomInternalData { + + private RandomInternalData() {} + + public static List generate(Schema schema, int numRecords, long seed) { + RandomDataGenerator generator = new RandomDataGenerator(seed); + List records = Lists.newArrayListWithExpectedSize(numRecords); + for (int i = 0; i < numRecords; i += 1) { + records.add((StructLike) TypeUtil.visit(schema, generator)); + } + + return records; + } + + private static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor { + private final Random random; + + private RandomDataGenerator(long seed) { + this.random = new Random(seed); + } + + @Override + public StructLike schema(Schema schema, Supplier structResult) { + return (StructLike) structResult.get(); + } + + @Override + public StructLike struct(Types.StructType struct, Iterable fieldResults) { + StructLike rec = GenericRecord.create(struct); + List values = Lists.newArrayList(fieldResults); + for (int i = 0; i < values.size(); i += 1) { + rec.set(i, values.get(i)); + } + + return rec; + } + + @Override + public Object field(Types.NestedField field, Supplier fieldResult) { + // return null 5% of the time when the value is optional + if (field.isOptional() && random.nextInt(20) == 1) { + return null; + } + return fieldResult.get(); + } + + @Override + public Object list(Types.ListType list, Supplier elementResult) { + return RandomUtil.generateList(random, list, elementResult); + } + + @Override + public Object map(Types.MapType map, Supplier keyResult, Supplier valueResult) { + return RandomUtil.generateMap(random, map, keyResult, valueResult); + } + + @Override + public Object primitive(Type.PrimitiveType primitive) { + Object result = RandomUtil.generatePrimitive(primitive, random); + + switch (primitive.typeId()) { + case FIXED: + case BINARY: + return ByteBuffer.wrap((byte[]) result); + case UUID: + return UUID.nameUUIDFromBytes((byte[]) result); + default: + return result; + } + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java index f64623f452f9..03108376eb4b 100644 --- a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java @@ -27,7 +27,6 @@ import org.apache.avro.JsonProperties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData.Record; -import org.apache.iceberg.StructLike; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -79,18 +78,6 @@ static void assertEquals(Types.StructType struct, Record expected, Record actual } } - static void assertEquals(Types.StructType struct, StructLike expected, StructLike actual) { - List fields = struct.fields(); - for (int i = 0; i < fields.size(); i += 1) { - Type fieldType = fields.get(i).type(); - - Object expectedValue = expected.get(i, Object.class); - Object actualValue = actual.get(i, Object.class); - - assertEquals(fieldType, expectedValue, actualValue); - } - } - static void assertEquals(Types.ListType list, List expected, List actual) { Type elementType = list.elementType(); @@ -139,18 +126,9 @@ private static void assertEquals(Type type, Object expected, Object actual) { assertThat(actual).as("Primitive value should be equal to expected").isEqualTo(expected); break; case STRUCT: - assertThat(expected) - .as("Expected should be a Record or StructLike") - .isInstanceOfAny(Record.class, StructLike.class); - - if (expected instanceof Record) { - assertThat(actual).as("Actual should be a Record").isInstanceOf(Record.class); - assertEquals(type.asStructType(), (Record) expected, (Record) actual); - } else { - assertThat(actual).as("Actual should be a GenericRecord").isInstanceOf(StructLike.class); - assertEquals(type.asStructType(), (StructLike) expected, (StructLike) actual); - } - + assertThat(expected).as("Expected should be a Record").isInstanceOf(Record.class); + assertThat(actual).as("Actual should be a Record").isInstanceOf(Record.class); + assertEquals(type.asStructType(), (Record) expected, (Record) actual); break; case LIST: assertThat(expected).as("Expected should be a List").isInstanceOf(List.class); diff --git a/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java b/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java index 212f178a306a..ebb9d93a5342 100644 --- a/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java +++ b/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java @@ -22,18 +22,13 @@ import java.util.List; import java.util.Map; import java.util.Random; -import java.util.Set; import java.util.UUID; import java.util.function.Supplier; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.util.Utf8; import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; @@ -53,66 +48,6 @@ public static List generate(Schema schema, int numRecords, long seed) { return records; } - public static List generateStructLike(Schema schema, int numRecords, long seed) { - RandomInternalDataGenerator generator = new RandomInternalDataGenerator(seed); - List records = Lists.newArrayListWithExpectedSize(numRecords); - for (int i = 0; i < numRecords; i += 1) { - records.add((StructLike) TypeUtil.visit(schema, generator)); - } - - return records; - } - - private static List generateList( - Random random, Types.ListType list, Supplier elementResult) { - int numElements = random.nextInt(20); - - List result = Lists.newArrayListWithExpectedSize(numElements); - for (int i = 0; i < numElements; i += 1) { - // return null 5% of the time when the value is optional - if (list.isElementOptional() && random.nextInt(20) == 1) { - result.add(null); - } else { - result.add(elementResult.get()); - } - } - - return result; - } - - private static Map generateMap( - Random random, Types.MapType map, Supplier keyResult, Supplier valueResult) { - int numEntries = random.nextInt(20); - - Map result = Maps.newLinkedHashMap(); - Supplier keyFunc; - if (map.keyType() == Types.StringType.get()) { - keyFunc = () -> keyResult.get().toString(); - } else { - keyFunc = keyResult; - } - - Set keySet = Sets.newHashSet(); - for (int i = 0; i < numEntries; i += 1) { - Object key = keyFunc.get(); - // ensure no collisions - while (keySet.contains(key)) { - key = keyFunc.get(); - } - - keySet.add(key); - - // return null 5% of the time when the value is optional - if (map.isValueOptional() && random.nextInt(20) == 1) { - result.put(key, null); - } else { - result.put(key, valueResult.get()); - } - } - - return result; - } - private static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor { private final Map typeToSchema; private final Random random; @@ -150,12 +85,12 @@ public Object field(Types.NestedField field, Supplier fieldResult) { @Override public Object list(Types.ListType list, Supplier elementResult) { - return generateList(random, list, elementResult); + return RandomUtil.generateList(random, list, elementResult); } @Override public Object map(Types.MapType map, Supplier keyResult, Supplier valueResult) { - return generateMap(random, map, keyResult, valueResult); + return RandomUtil.generateMap(random, map, keyResult, valueResult); } @Override @@ -177,63 +112,4 @@ public Object primitive(Type.PrimitiveType primitive) { } } } - - private static class RandomInternalDataGenerator - extends TypeUtil.CustomOrderSchemaVisitor { - private final Random random; - - private RandomInternalDataGenerator(long seed) { - this.random = new Random(seed); - } - - @Override - public StructLike schema(Schema schema, Supplier structResult) { - return (StructLike) structResult.get(); - } - - @Override - public StructLike struct(Types.StructType struct, Iterable fieldResults) { - StructLike rec = GenericRecord.create(struct); - List values = Lists.newArrayList(fieldResults); - for (int i = 0; i < values.size(); i += 1) { - rec.set(i, values.get(i)); - } - - return rec; - } - - @Override - public Object field(Types.NestedField field, Supplier fieldResult) { - // return null 5% of the time when the value is optional - if (field.isOptional() && random.nextInt(20) == 1) { - return null; - } - return fieldResult.get(); - } - - @Override - public Object list(Types.ListType list, Supplier elementResult) { - return generateList(random, list, elementResult); - } - - @Override - public Object map(Types.MapType map, Supplier keyResult, Supplier valueResult) { - return generateMap(random, map, keyResult, valueResult); - } - - @Override - public Object primitive(Type.PrimitiveType primitive) { - Object result = RandomUtil.generatePrimitive(primitive, random); - - switch (primitive.typeId()) { - case FIXED: - case BINARY: - return ByteBuffer.wrap((byte[]) result); - case UUID: - return UUID.nameUUIDFromBytes((byte[]) result); - default: - return result; - } - } - } } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java b/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java index d542bf6fc03c..773309a73d4f 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.util.List; +import org.apache.iceberg.InternalTestHelpers; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RandomInternalData; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.inmemory.InMemoryOutputFile; @@ -31,7 +33,8 @@ public class TestInternalAvro extends AvroDataTest { @Override protected void writeAndValidate(Schema schema) throws IOException { - List expected = RandomAvroData.generateStructLike(schema, 100, 0L); + List expected = + RandomInternalData.generate(schema, 100, System.currentTimeMillis()); OutputFile outputFile = new InMemoryOutputFile(); @@ -57,7 +60,7 @@ protected void writeAndValidate(Schema schema) throws IOException { } for (int i = 0; i < expected.size(); i += 1) { - AvroTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i)); + InternalTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i)); } } } From 2f4d8df301984bd249d6b700068b6a9f9cdcf7fc Mon Sep 17 00:00:00 2001 From: ajantha-bhat Date: Fri, 10 Jan 2025 06:57:52 +0530 Subject: [PATCH 4/4] Update random seed --- .../test/java/org/apache/iceberg/avro/TestInternalAvro.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java b/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java index 773309a73d4f..b48109737f7c 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java @@ -33,8 +33,7 @@ public class TestInternalAvro extends AvroDataTest { @Override protected void writeAndValidate(Schema schema) throws IOException { - List expected = - RandomInternalData.generate(schema, 100, System.currentTimeMillis()); + List expected = RandomInternalData.generate(schema, 100, 42L); OutputFile outputFile = new InMemoryOutputFile();